Big Data 4 min read

Real-time Kafka Message Consumption and MySQL Sink with Apache Flink

This tutorial explains how to consume Kafka messages in real time using Apache Flink and persist them into a MySQL database by adding the JDBC dependency, implementing a custom RichSinkFunction, and configuring a Flink job with a Kafka source and MySQL sink.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Real-time Kafka Message Consumption and MySQL Sink with Apache Flink

This article demonstrates how to consume real-time messages from Kafka and write them into MySQL using Apache Flink.

First, add the MySQL JDBC driver dependency to your Maven project.

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.39</version>
</dependency>

Next, implement a custom RichSinkFunction (MysqlSink) that establishes a JDBC connection, prepares a REPLACE INTO statement, and writes each Tuple3<Integer, String, Integer> (id, num, price) to the MySQL table.

public class MysqlSink extends RichSinkFunction<Tuple3<Integer, String, Integer>> {

  private Connection connection;
  private PreparedStatement preparedStatement;
  String username = "";
  String password = "";
  String drivername = ""; // configure your driver
  String dburl = ""; // configure your DB URL

  @Override
  public void invoke(Tuple3<Integer, String, Integer> value) throws Exception {
    Class.forName(drivername);
    connection = DriverManager.getConnection(dburl, username, password);
    String sql = "replace into table(id,num,price) values(?,?,?)"; // assume table has columns id,num,price
    preparedStatement = connection.prepareStatement(sql);
    preparedStatement.setInt(1, value.f0);
    preparedStatement.setString(2, value.f1);
    preparedStatement.setInt(3, value.f2);
    preparedStatement.executeUpdate();
    if (preparedStatement != null) {
      preparedStatement.close();
    }
    if (connection != null) {
      connection.close();
    }
  }
}

Finally, create the Flink job main class (MysqlSinkTest) that sets up the StreamExecutionEnvironment, configures the Kafka consumer, parses incoming CSV strings into Tuple3 objects, and adds the MysqlSink to the data stream.

public class MysqlSinkTest {

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");

    // Example data: 1,abc,100 (could also be complex JSON)
    FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
    env.getConfig().disableSysoutLogging(); // suppress log output
    env.getConfig().setRestartStrategy(
        RestartStrategies.fixedDelayRestart(5, 5000));
    env.enableCheckpointing(2000);

    DataStream<String> stream = env.addSource(consumer);
    DataStream<Tuple3<Integer, String, Integer>> sourceStream = stream
        .filter((FilterFunction<String>) value -> StringUtils.isNotBlank(value))
        .map((MapFunction<String, Tuple3<Integer, String, Integer>>) value -> {
          String[] args1 = value.split(",");
          return new Tuple3<>(Integer.valueOf(args1[0]), args1[1], Integer.valueOf(args1[2]));
        });

    sourceStream.addSink(new MysqlSink());
    env.execute("data to mysql start");
  }
}

Replace the placeholder configuration values (driver name, DB URL, username, password) with your own settings, compile and run the job, and you will see Kafka data being persisted into MySQL in real time.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaFlinkStreamingmysqlSink
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.