Real‑Time Sync of New MySQL Tables to Doris Using Flink CDC
This article explains how to extend a Flink CDC job that already syncs an entire MySQL database to Doris so that newly created tables are automatically created in Doris in real time, using the CdcTools utility, side‑output streams, and asynchronous I/O.
0. Feasible Solution
The hardest part is mapping MySQL table definitions to Doris table definitions inside Flink code; manually writing field‑by‑field mappings is error‑prone and inefficient.
1. CdcTools Invocation
The CdcTools class provided by the flink‑doris‑connector can create tables directly from the command line. The same call can be embedded in Flink code:
CdcTools.main(Array[String](
"mysql-sync-database",
"--create-table-only",
"--database", "test",
"--mysql-conf", "hostname=192.xxx.xx.xxx",
"--mysql-conf", "port=3306",
"--mysql-conf", "username=mysql_user",
"--mysql-conf", "password=****",
"--mysql-conf", "database-name=test",
"--sink-conf", "fenodes=192.168.xxx.xx:8030",
"--sink-conf", "username=doris_user",
"--sink-conf", "password=****",
"--sink-conf", "jdbc-url=jdbc:mysql://192.168.xxx.xxx:9030",
"--sink-conf", "sink.label-prefix=label01",
"--table-conf", "replication_num=2"
))2. Flink CDC Implementation Strategy
Two kinds of events need different handling:
Create‑table DDL – should be sent to a side output stream.
Insert/Update/Delete DML – can be processed in the main output stream using Doris’s write API.
Side‑output streams allow the DDL branch to be processed independently, for example with asynchronous I/O to avoid blocking the main data flow.
3. Core Flink Coding
First, detect whether a record contains a DDL payload by checking the historyRecord key in the JSON produced by Flink CDC:
if (json.containsKey("historyRecord")) {
val histJson = json.getJSONObject("historyRecord")
if (histJson.containsKey("tableChanges") && !histJson.getJSONArray("tableChanges").isEmpty) {
val tableChange = histJson.getJSONArray("tableChanges").getJSONObject(0)
val ddlType = tableChange.getString("type") // ALTER / CREATE / DROP
if ("CREATE".equalsIgnoreCase(ddlType)) {
ctx.output(ddlSideOutput, histJson)
}
}
}For the DDL side stream, use AsyncDataStream.unorderedWait to invoke CdcTools without blocking:
AsyncDataStream.unorderedWait(createDS,
new AsyncFunction[JSONObject, String] {
override def asyncInvoke(input: JSONObject, resultFuture: ResultFuture[String]): Unit = {
val tableChange = input.getJSONArray("tableChanges").getJSONObject(0)
val ddlType = tableChange.getString("type")
if ("CREATE".equalsIgnoreCase(ddlType)) {
CdcTools.main(Array[String](
"mysql-sync-database",
"--create-table-only",
"--database", "test",
// ... same MySQL and Doris parameters as above ...
))
}
resultFuture.complete(Collections.singleton("done"))
}
},
1000, TimeUnit.MILLISECONDS, 1).setParallelism(1)The main stream continues to write DML records to Doris using the standard sink.
4. Important Note
Doris requires an extra delete‑sign column for logical deletions. When forwarding DML rows, add the field "__DORIS_DELETE_SIGN__" with value 0; otherwise Doris will reject the record:
rowJson.put("__DORIS_DELETE_SIGN__", 0)If the column is missing, Doris returns an error such as:
Reason: There is no column matching jsonpaths in the json file, columns:[id, name, __DORIS_DELETE_SIGN__], please check columns and jsonpaths: {"name":"X","id":5}By leveraging the built‑in CdcTools and separating DDL from DML, the solution achieves real‑time synchronization of newly created MySQL tables to Doris with minimal code changes and good performance.
ITPUB
Official ITPUB account sharing technical insights, community news, and exciting events.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
