Using Flink CDC to Capture MySQL Changes and Sync Them to ClickHouse
This article introduces Change Data Capture (CDC), compares query‑based and log‑based CDC, explains Debezium and ClickHouse, and provides step‑by‑step Flink CDC and Flink SQL CDC examples—including full Java code—to stream MySQL binlog changes into ClickHouse for real‑time analytics.
Change Data Capture (CDC) is a technique for capturing INSERT, UPDATE, DELETE operations from a database and delivering the changes downstream. It is widely used for data synchronization, building analytical platforms, sharing state between micro‑services, and updating caches or CQRS query views.
The article first distinguishes query‑based CDC (which scans tables) from log‑based CDC (which reads database logs such as MySQL binlog). Log‑based CDC offers low latency, non‑intrusive capture, and the ability to capture delete events and old record states.
Debezium is introduced as an open‑source, low‑latency streaming platform for CDC. It monitors databases, emits row‑level change events, and guarantees durability so applications never miss events even after restarts.
ClickHouse is presented as a column‑oriented OLAP database suitable for real‑time analytics. Its advantages include true columnar storage, data compression, distributed processing, and vectorized query execution, while its drawbacks are lack of full transaction support and limited update/delete capabilities.
Flink CDC Example
The following Java code shows how to create a Flink CDC source that reads MySQL binlog via Debezium, deserialize the change events to JSON, and sink them into ClickHouse.
SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("test")
.username("flinkcdc")
.password("dafei1288")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
// add source
env.addSource(sourceFunction)
// add sink
.addSink(new ClickhouseSink());The custom deserialization schema converts Debezium records into a JSON structure:
public static class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema {
@Override
public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {
Gson jsstr = new Gson();
HashMap<String, Object> hs = new HashMap<>();
String topic = sourceRecord.topic();
String[] split = topic.split("[.]");
String database = split[1];
String table = split[2];
hs.put("database", database);
hs.put("table", table);
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
Struct struct = (Struct) sourceRecord.value();
Struct after = struct.getStruct("after");
if (after != null) {
Schema schema = after.schema();
HashMap<String, Object> afhs = new HashMap<>();
for (Field field : schema.fields()) {
afhs.put(field.name(), after.get(field.name()));
}
hs.put("data", afhs);
}
String type = operation.toString().toLowerCase();
if ("create".equals(type)) {
type = "insert";
}
hs.put("type", type);
collector.collect(jsstr.toJson(hs));
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}The sink writes the captured rows into ClickHouse using a prepared statement:
public static class ClickhouseSink extends RichSinkFunction<String> {
Connection connection;
PreparedStatement pstmt;
private Connection getConnection() {
try {
Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
String url = "jdbc:clickhouse://localhost:8123/default";
return DriverManager.getConnection(url, "default", "dafei1288");
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = getConnection();
String sql = "insert into sink_ch_test(id,name,description) values (?,?,?)";
pstmt = connection.prepareStatement(sql);
}
@Override
public void invoke(String value, Context context) throws Exception {
Gson t = new Gson();
HashMap<String,Object> hs = t.fromJson(value, HashMap.class);
String database = (String)hs.get("database");
String table = (String)hs.get("table");
String type = (String)hs.get("type");
if ("test".equals(database) && "test_cdc".equals(table) && "insert".equals(type)) {
LinkedTreeMap<String,Object> data = (LinkedTreeMap<String,Object>)hs.get("data");
pstmt.setInt(1, ((Double)data.get("id")).intValue());
pstmt.setString(2, (String)data.get("name"));
pstmt.setString(3, (String)data.get("description"));
pstmt.executeUpdate();
}
}
@Override
public void close() throws Exception {
super.close();
if (pstmt != null) pstmt.close();
if (connection != null) connection.close();
}
}Running the job streams MySQL changes into ClickHouse, as shown by the screenshots of successful data ingestion.
Flink SQL CDC
The same CDC pipeline can be expressed with Flink SQL using three statements: a source table definition with the mysql-cdc connector, a sink table definition with the jdbc connector, and an INSERT‑SELECT statement to move data.
String sourceDDL = "CREATE TABLE mysql_binlog (
id INT NOT NULL,
name STRING,
description STRING
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'flinkcdc',
'password' = 'dafei1288',
'database-name' = 'test',
'table-name' = 'test_cdc'
)";
String sinkDDL = "CREATE TABLE test_cdc_sink (
id INT NOT NULL,
name STRING,
description STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'driver' = 'com.mysql.jdbc.Driver',
'url' = 'jdbc:mysql://127.0.0.1:3306/test',
'username' = 'root',
'password' = 'dafei1288',
'table-name' = 'test_cdc_sink'
)";
String transformSQL = "INSERT INTO test_cdc_sink SELECT * FROM mysql_binlog";The article also lists the Maven dependencies required for Flink, the MySQL CDC connector, the ClickHouse connector, and Gson.
Overall, the guide provides a complete end‑to‑end solution for real‑time data replication from MySQL to ClickHouse using Flink CDC, both programmatically and via SQL.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
