Sync New MySQL Tables to Doris in Real‑Time with Flink CDC and CdcTools

This article explains how to use Flink CDC together with the CdcTools utility to automatically capture newly created MySQL tables and synchronize both their schema and data to a Doris database in real time, covering the required code, side‑output handling, async execution, and a special delete‑sign field.

ITPUB
ITPUB
ITPUB
Sync New MySQL Tables to Doris in Real‑Time with Flink CDC and CdcTools

When using Flink CDC to replicate an entire MySQL database to Doris, a new challenge arises: newly created tables must also be synchronized without any notification from the upstream source. The solution relies on the CdcTools class provided by the flink‑doris‑connector, which can create tables directly from MySQL DDL.

1. Using CdcTools

The CdcTools class can be invoked from Flink code just like a command‑line tool. The following call creates tables only:

CdcTools.main(Array[String](
  "mysql-sync-database",
  "--create-table-only",
  "--database", "test",
  "--mysql-conf", "hostname=192.xxx.xx.xxx",
  "--mysql-conf", "port=3306",
  "--mysql-conf", "username=mysql_user",
  "--mysql-conf", "password=****",
  "--mysql-conf", "database-name=test",
  "--sink-conf", "fenodes=192.168.xxx.xx:8030",
  "--sink-conf", "username=doris_user",
  "--sink-conf", "password=****",
  "--sink-conf", "jdbc-url=jdbc:mysql://192.168.xxx.xxx:9030",
  "--sink-conf", "sink.label-prefix=label01",
  "--table-conf", "replication_num=2"
))

2. Detecting CREATE DDL

Flink CDC emits a JSON record that contains a historyRecord field when a DDL event occurs. By checking this field you can separate CREATE statements from normal DML events:

if (json.containsKey("historyRecord")) {
  val histJson = json.getJSONObject("historyRecord")
  if (histJson.containsKey("tableChanges") && !histJson.getJSONArray("tableChanges").isEmpty) {
    val tableChange = histJson.getJSONArray("tableChanges").getJSONObject(0)
    val ddlType = tableChange.getString("type") // ALTER/CREATE/DROP
    if ("CREATE".equalsIgnoreCase(ddlType)) {
      ctx.output(ddlSideOutput, histJson)
    }
  }
}

3. Asynchronous Execution of CREATE

CREATE operations are sent to a side output stream and processed with Flink’s asynchronous I/O to avoid blocking the main pipeline:

AsyncDataStream.unorderedWait(createDS, new AsyncFunction[JSONObject, String] {
  override def asyncInvoke(input: JSONObject, resultFuture: ResultFuture[String]): Unit = {
    val tableChange = input.getJSONArray("tableChanges").getJSONObject(0)
    val ddlType = tableChange.getString("type")
    if ("CREATE".equalsIgnoreCase(ddlType)) {
      CdcTools.main(Array[String](
        "mysql-sync-database",
        "--create-table-only",
        "--database", "test",
        "--mysql-conf", "hostname=192.xx.xxx.xxx",
        "--mysql-conf", "port=3306",
        "--mysql-conf", "username=mysql_user",
        "--mysql-conf", "password=***",
        "--mysql-conf", "database-name=test",
        "--sink-conf", "fenodes=192.168.xxx.xxx:8030",
        "--sink-conf", "username=doris_user",
        "--sink-conf", "password=***",
        "--sink-conf", "jdbc-url=jdbc:mysql://192.168.xxx.xxx:9030",
        "--sink-conf", "sink.label-prefix=label01",
        "--table-conf", "replication_num=2"
      ))
    }
    resultFuture.complete(Collections.singleton("done"))
  }
}, 1000, TimeUnit.MILLISECONDS, 1).setParallelism(1)

Normal DML (INSERT/UPDATE/DELETE) continues on the main output stream.

4. Adding Doris Delete‑Sign Field

Doris requires an extra column __DORIS_DELETE_SIGN__ for logical deletes. The field must be added to every row before it is written to Doris, otherwise an error like the following occurs:

Reason: There is no column matching jsonpaths in the json file, columns:[id, name, __DORIS_DELETE_SIGN__], please check columns and jsonpaths:. src line {"name":"X","id":5}

Adding the field is straightforward:

rowJson.put("__DORIS_DELETE_SIGN__", 0)

5. Summary

The overall approach is:

Detect CREATE DDL via the historyRecord JSON field.

Route CREATE events to a side‑output stream.

Execute the CREATE DDL asynchronously using CdcTools.main.

Process normal DML on the main stream.

Ensure each row contains the __DORIS_DELETE_SIGN__ column for Doris.

This method leverages existing tooling in the flink‑doris‑connector, avoids manual schema mapping, and provides a clean, production‑ready solution for real‑time MySQL‑to‑Doris synchronization with automatic table creation.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

FlinkMySQLCDCScaladorisasync-ioreal-time-sync
ITPUB
Written by

ITPUB

Official ITPUB account sharing technical insights, community news, and exciting events.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.