Sync New MySQL Tables to Doris in Real‑Time with Flink CDC and CdcTools
This article explains how to use Flink CDC together with the CdcTools utility to automatically capture newly created MySQL tables and synchronize both their schema and data to a Doris database in real time, covering the required code, side‑output handling, async execution, and a special delete‑sign field.
When using Flink CDC to replicate an entire MySQL database to Doris, a new challenge arises: newly created tables must also be synchronized without any notification from the upstream source. The solution relies on the CdcTools class provided by the flink‑doris‑connector, which can create tables directly from MySQL DDL.
1. Using CdcTools
The CdcTools class can be invoked from Flink code just like a command‑line tool. The following call creates tables only:
CdcTools.main(Array[String](
"mysql-sync-database",
"--create-table-only",
"--database", "test",
"--mysql-conf", "hostname=192.xxx.xx.xxx",
"--mysql-conf", "port=3306",
"--mysql-conf", "username=mysql_user",
"--mysql-conf", "password=****",
"--mysql-conf", "database-name=test",
"--sink-conf", "fenodes=192.168.xxx.xx:8030",
"--sink-conf", "username=doris_user",
"--sink-conf", "password=****",
"--sink-conf", "jdbc-url=jdbc:mysql://192.168.xxx.xxx:9030",
"--sink-conf", "sink.label-prefix=label01",
"--table-conf", "replication_num=2"
))2. Detecting CREATE DDL
Flink CDC emits a JSON record that contains a historyRecord field when a DDL event occurs. By checking this field you can separate CREATE statements from normal DML events:
if (json.containsKey("historyRecord")) {
val histJson = json.getJSONObject("historyRecord")
if (histJson.containsKey("tableChanges") && !histJson.getJSONArray("tableChanges").isEmpty) {
val tableChange = histJson.getJSONArray("tableChanges").getJSONObject(0)
val ddlType = tableChange.getString("type") // ALTER/CREATE/DROP
if ("CREATE".equalsIgnoreCase(ddlType)) {
ctx.output(ddlSideOutput, histJson)
}
}
}3. Asynchronous Execution of CREATE
CREATE operations are sent to a side output stream and processed with Flink’s asynchronous I/O to avoid blocking the main pipeline:
AsyncDataStream.unorderedWait(createDS, new AsyncFunction[JSONObject, String] {
override def asyncInvoke(input: JSONObject, resultFuture: ResultFuture[String]): Unit = {
val tableChange = input.getJSONArray("tableChanges").getJSONObject(0)
val ddlType = tableChange.getString("type")
if ("CREATE".equalsIgnoreCase(ddlType)) {
CdcTools.main(Array[String](
"mysql-sync-database",
"--create-table-only",
"--database", "test",
"--mysql-conf", "hostname=192.xx.xxx.xxx",
"--mysql-conf", "port=3306",
"--mysql-conf", "username=mysql_user",
"--mysql-conf", "password=***",
"--mysql-conf", "database-name=test",
"--sink-conf", "fenodes=192.168.xxx.xxx:8030",
"--sink-conf", "username=doris_user",
"--sink-conf", "password=***",
"--sink-conf", "jdbc-url=jdbc:mysql://192.168.xxx.xxx:9030",
"--sink-conf", "sink.label-prefix=label01",
"--table-conf", "replication_num=2"
))
}
resultFuture.complete(Collections.singleton("done"))
}
}, 1000, TimeUnit.MILLISECONDS, 1).setParallelism(1)Normal DML (INSERT/UPDATE/DELETE) continues on the main output stream.
4. Adding Doris Delete‑Sign Field
Doris requires an extra column __DORIS_DELETE_SIGN__ for logical deletes. The field must be added to every row before it is written to Doris, otherwise an error like the following occurs:
Reason: There is no column matching jsonpaths in the json file, columns:[id, name, __DORIS_DELETE_SIGN__], please check columns and jsonpaths:. src line {"name":"X","id":5}Adding the field is straightforward:
rowJson.put("__DORIS_DELETE_SIGN__", 0)5. Summary
The overall approach is:
Detect CREATE DDL via the historyRecord JSON field.
Route CREATE events to a side‑output stream.
Execute the CREATE DDL asynchronously using CdcTools.main.
Process normal DML on the main stream.
Ensure each row contains the __DORIS_DELETE_SIGN__ column for Doris.
This method leverages existing tooling in the flink‑doris‑connector, avoids manual schema mapping, and provides a clean, production‑ready solution for real‑time MySQL‑to‑Doris synchronization with automatic table creation.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
ITPUB
Official ITPUB account sharing technical insights, community news, and exciting events.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
