Big Data 17 min read

Using Flink CDC to Capture MySQL Changes and Sync Them to ClickHouse

This article introduces Change Data Capture (CDC), compares query‑based and log‑based approaches, explains Debezium and ClickHouse, and provides detailed Flink CDC and Flink SQL CDC examples—including Java source code, custom deserialization schema, ClickHouse sink implementation, and required Maven dependencies—to synchronize MySQL data into ClickHouse in real time.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Using Flink CDC to Capture MySQL Changes and Sync Them to ClickHouse

Introduction to Change Data Capture (CDC) and its typical application scenarios such as heterogeneous database synchronization, microservice data sharing, and cache or CQRS view updates.

Comparison table of query‑based CDC versus log‑based CDC, highlighting concepts, open‑source products, execution mode, latency, invasiveness, and support for delete events.

Overview of Debezium as an open‑source distributed platform for CDC, describing its low‑latency streaming, durability, and ability to capture row‑level changes without missing events.

Advantages and disadvantages of ClickHouse as a column‑oriented OLAP database, including columnar storage, compression, distributed processing, and lack of full transaction support.

Flink CDC implementation: creating a MySQL source, a custom JsonDebeziumDeserializationSchema to deserialize Debezium records, and a ClickhouseSink to write changes into ClickHouse. The full Java code is provided.

SourceFunction<String> sourceFunction = MySQLSource.<span style="color: rgb:152,26,26"><</span>String<span style="color: rgb:152,26,26">></span>builder()
    .hostname("localhost")
    .port(3306)
    .databaseList("test")
    .username("flinkcdc")
    .password("dafei1288")
    .deserializer(new JsonDebeziumDeserializationSchema())
    .build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.addSource(sourceFunction)
    .addSink(new ClickhouseSink());

env.execute("mysql2clickhouse");

Custom deserialization schema that extracts database, table, operation type, and after‑image fields, converts them to JSON, and emits the result.

public static class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema {
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {
        Gson gson = new Gson();
        HashMap<String, Object> hs = new HashMap<>();
        String topic = sourceRecord.topic();
        String[] split = topic.split("[.]");
        String database = split[1];
        String table = split[2];
        hs.put("database", database);
        hs.put("table", table);
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        Struct struct = (Struct) sourceRecord.value();
        Struct after = struct.getStruct("after");
        if (after != null) {
            Schema schema = after.schema();
            HashMap<String, Object> afhs = new HashMap<>();
            for (Field field : schema.fields()) {
                afhs.put(field.name(), after.get(field.name()));
            }
            hs.put("data", afhs);
        }
        String type = operation.toString().toLowerCase();
        if ("create".equals(type)) {
            type = "insert";
        }
        hs.put("type", type);
        collector.collect(gson.toJson(hs));
    }
    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

ClickHouse sink that establishes a JDBC connection and inserts records into a target table.

public static class ClickhouseSink extends RichSinkFunction<String> {
    private Connection connection;
    private PreparedStatement pstmt;
    private Connection getConnection() throws Exception {
        Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
        String url = "jdbc:clickhouse://localhost:8123/default";
        return DriverManager.getConnection(url, "default", "dafei1288");
    }
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        connection = getConnection();
        String sql = "insert into sink_ch_test(id,name,description) values (?,?,?)";
        pstmt = connection.prepareStatement(sql);
    }
    @Override
    public void invoke(String value, Context context) throws Exception {
        Gson gson = new Gson();
        HashMap<String, Object> hs = gson.fromJson(value, HashMap.class);
        String database = (String) hs.get("database");
        String table = (String) hs.get("table");
        String type = (String) hs.get("type");
        if ("test".equals(database) && "test_cdc".equals(table) && "insert".equals(type)) {
            LinkedTreeMap<String, Object> data = (LinkedTreeMap<String, Object>) hs.get("data");
            Double id = (Double) data.get("id");
            String name = (String) data.get("name");
            String description = (String) data.get("description");
            pstmt.setInt(1, id.intValue());
            pstmt.setString(2, name);
            pstmt.setString(3, description);
            pstmt.executeUpdate();
        }
    }
    @Override
    public void close() throws Exception {
        super.close();
        if (pstmt != null) pstmt.close();
        if (connection != null) connection.close();
    }
}

Flink SQL CDC example: DDL for a MySQL source table and a ClickHouse sink table, followed by an INSERT‑SELECT statement that streams all changes directly.

String sourceDDL = "CREATE TABLE mysql_binlog (
"
    + " id INT NOT NULL,
"
    + " name STRING,
"
    + " description STRING
"
    + ") WITH (
"
    + " 'connector' = 'mysql-cdc',
"
    + " 'hostname' = 'localhost',
"
    + " 'port' = '3306',
"
    + " 'username' = 'flinkcdc',
"
    + " 'password' = 'dafei1288',
"
    + " 'database-name' = 'test',
"
    + " 'table-name' = 'test_cdc'
"
    + ")";

String sinkDDL = "CREATE TABLE test_cdc_sink (
"
    + " id INT NOT NULL,
"
    + " name STRING,
"
    + " description STRING,
"
    + " PRIMARY KEY (id) NOT ENFORCED 
"
    + ") WITH (
"
    + " 'connector' = 'jdbc',
"
    + " 'driver' = 'com.mysql.jdbc.Driver',
"
    + " 'url' = '" + url + "',
"
    + " 'username' = '" + userName + "',
"
    + " 'password' = '" + password + "',
"
    + " 'table-name' = '" + mysqlSinkTable + "'
"
    + ")";

String transformSQL = "insert into test_cdc_sink select * from mysql_binlog";

Maven dependencies required for the project, including Flink core, streaming, JDBC connector, Flink‑CDC MySQL connector, ClickHouse JDBC driver, and Gson.

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-core</artifactId>
        <version>1.13.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.13.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc_2.12</artifactId>
        <version>1.13.0</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>1.4.0</version>
    </dependency>
    <dependency>
        <groupId>ru.yandex.clickhouse</groupId>
        <artifactId>clickhouse-jdbc</artifactId>
        <version>0.2.6</version>
    </dependency>
    <dependency>
        <groupId>com.google.code.gson</groupId>
        <artifactId>gson</artifactId>
        <version>2.8.6</version>
    </dependency>
</dependencies>

Running the Flink job synchronizes data from MySQL to ClickHouse, and the article shows screenshots of successful execution results.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Big DataFlinkClickHousemysqlCDCDebeziumData Streaming
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.