Big Data 17 min read

Using Flink CDC to Capture MySQL Changes and Sync Them to ClickHouse

This article introduces Change Data Capture (CDC), compares query‑based and log‑based CDC, explains Debezium and ClickHouse, and provides step‑by‑step Flink CDC and Flink SQL CDC examples—including full Java code—to stream MySQL binlog changes into ClickHouse for real‑time analytics.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Using Flink CDC to Capture MySQL Changes and Sync Them to ClickHouse

Change Data Capture (CDC) is a technique for capturing INSERT, UPDATE, DELETE operations from a database and delivering the changes downstream. It is widely used for data synchronization, building analytical platforms, sharing state between micro‑services, and updating caches or CQRS query views.

The article first distinguishes query‑based CDC (which scans tables) from log‑based CDC (which reads database logs such as MySQL binlog). Log‑based CDC offers low latency, non‑intrusive capture, and the ability to capture delete events and old record states.

Debezium is introduced as an open‑source, low‑latency streaming platform for CDC. It monitors databases, emits row‑level change events, and guarantees durability so applications never miss events even after restarts.

ClickHouse is presented as a column‑oriented OLAP database suitable for real‑time analytics. Its advantages include true columnar storage, data compression, distributed processing, and vectorized query execution, while its drawbacks are lack of full transaction support and limited update/delete capabilities.

Flink CDC Example

The following Java code shows how to create a Flink CDC source that reads MySQL binlog via Debezium, deserialize the change events to JSON, and sink them into ClickHouse.

SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
        .hostname("localhost")
        .port(3306)
        .databaseList("test")
        .username("flinkcdc")
        .password("dafei1288")
        .deserializer(new JsonDebeziumDeserializationSchema())
        .build();

// add source
env.addSource(sourceFunction)
   // add sink
   .addSink(new ClickhouseSink());

The custom deserialization schema converts Debezium records into a JSON structure:

public static class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema {
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {
        Gson jsstr = new Gson();
        HashMap<String, Object> hs = new HashMap<>();
        String topic = sourceRecord.topic();
        String[] split = topic.split("[.]");
        String database = split[1];
        String table = split[2];
        hs.put("database", database);
        hs.put("table", table);
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        Struct struct = (Struct) sourceRecord.value();
        Struct after = struct.getStruct("after");
        if (after != null) {
            Schema schema = after.schema();
            HashMap<String, Object> afhs = new HashMap<>();
            for (Field field : schema.fields()) {
                afhs.put(field.name(), after.get(field.name()));
            }
            hs.put("data", afhs);
        }
        String type = operation.toString().toLowerCase();
        if ("create".equals(type)) {
            type = "insert";
        }
        hs.put("type", type);
        collector.collect(jsstr.toJson(hs));
    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

The sink writes the captured rows into ClickHouse using a prepared statement:

public static class ClickhouseSink extends RichSinkFunction<String> {
    Connection connection;
    PreparedStatement pstmt;
    private Connection getConnection() {
        try {
            Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
            String url = "jdbc:clickhouse://localhost:8123/default";
            return DriverManager.getConnection(url, "default", "dafei1288");
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        connection = getConnection();
        String sql = "insert into sink_ch_test(id,name,description) values (?,?,?)";
        pstmt = connection.prepareStatement(sql);
    }
    @Override
    public void invoke(String value, Context context) throws Exception {
        Gson t = new Gson();
        HashMap<String,Object> hs = t.fromJson(value, HashMap.class);
        String database = (String)hs.get("database");
        String table = (String)hs.get("table");
        String type = (String)hs.get("type");
        if ("test".equals(database) && "test_cdc".equals(table) && "insert".equals(type)) {
            LinkedTreeMap<String,Object> data = (LinkedTreeMap<String,Object>)hs.get("data");
            pstmt.setInt(1, ((Double)data.get("id")).intValue());
            pstmt.setString(2, (String)data.get("name"));
            pstmt.setString(3, (String)data.get("description"));
            pstmt.executeUpdate();
        }
    }
    @Override
    public void close() throws Exception {
        super.close();
        if (pstmt != null) pstmt.close();
        if (connection != null) connection.close();
    }
}

Running the job streams MySQL changes into ClickHouse, as shown by the screenshots of successful data ingestion.

Flink SQL CDC

The same CDC pipeline can be expressed with Flink SQL using three statements: a source table definition with the mysql-cdc connector, a sink table definition with the jdbc connector, and an INSERT‑SELECT statement to move data.

String sourceDDL = "CREATE TABLE mysql_binlog (
    id INT NOT NULL,
    name STRING,
    description STRING
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'flinkcdc',
    'password' = 'dafei1288',
    'database-name' = 'test',
    'table-name' = 'test_cdc'
)";

String sinkDDL = "CREATE TABLE test_cdc_sink (
    id INT NOT NULL,
    name STRING,
    description STRING,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'driver' = 'com.mysql.jdbc.Driver',
    'url' = 'jdbc:mysql://127.0.0.1:3306/test',
    'username' = 'root',
    'password' = 'dafei1288',
    'table-name' = 'test_cdc_sink'
)";

String transformSQL = "INSERT INTO test_cdc_sink SELECT * FROM mysql_binlog";

The article also lists the Maven dependencies required for Flink, the MySQL CDC connector, the ClickHouse connector, and Gson.

Overall, the guide provides a complete end‑to‑end solution for real‑time data replication from MySQL to ClickHouse using Flink CDC, both programmatically and via SQL.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Big DataFlinkClickHousemysqlCDCDebeziumData Streaming
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.