Big Data 9 min read

Real‑Time Sync of New MySQL Tables to Doris Using Flink CDC

This article explains how to extend a Flink CDC job that already syncs an entire MySQL database to Doris so that newly created tables are automatically created in Doris in real time, using the CdcTools utility, side‑output streams, and asynchronous I/O.

ITPUB
ITPUB
ITPUB
Real‑Time Sync of New MySQL Tables to Doris Using Flink CDC

0. Feasible Solution

The hardest part is mapping MySQL table definitions to Doris table definitions inside Flink code; manually writing field‑by‑field mappings is error‑prone and inefficient.

1. CdcTools Invocation

The CdcTools class provided by the flink‑doris‑connector can create tables directly from the command line. The same call can be embedded in Flink code:

CdcTools.main(Array[String](
  "mysql-sync-database",
  "--create-table-only",
  "--database", "test",
  "--mysql-conf", "hostname=192.xxx.xx.xxx",
  "--mysql-conf", "port=3306",
  "--mysql-conf", "username=mysql_user",
  "--mysql-conf", "password=****",
  "--mysql-conf", "database-name=test",
  "--sink-conf", "fenodes=192.168.xxx.xx:8030",
  "--sink-conf", "username=doris_user",
  "--sink-conf", "password=****",
  "--sink-conf", "jdbc-url=jdbc:mysql://192.168.xxx.xxx:9030",
  "--sink-conf", "sink.label-prefix=label01",
  "--table-conf", "replication_num=2"
))

2. Flink CDC Implementation Strategy

Two kinds of events need different handling:

Create‑table DDL – should be sent to a side output stream.

Insert/Update/Delete DML – can be processed in the main output stream using Doris’s write API.

Side‑output streams allow the DDL branch to be processed independently, for example with asynchronous I/O to avoid blocking the main data flow.

3. Core Flink Coding

First, detect whether a record contains a DDL payload by checking the historyRecord key in the JSON produced by Flink CDC:

if (json.containsKey("historyRecord")) {
  val histJson = json.getJSONObject("historyRecord")
  if (histJson.containsKey("tableChanges") && !histJson.getJSONArray("tableChanges").isEmpty) {
    val tableChange = histJson.getJSONArray("tableChanges").getJSONObject(0)
    val ddlType = tableChange.getString("type") // ALTER / CREATE / DROP
    if ("CREATE".equalsIgnoreCase(ddlType)) {
      ctx.output(ddlSideOutput, histJson)
    }
  }
}

For the DDL side stream, use AsyncDataStream.unorderedWait to invoke CdcTools without blocking:

AsyncDataStream.unorderedWait(createDS,
  new AsyncFunction[JSONObject, String] {
    override def asyncInvoke(input: JSONObject, resultFuture: ResultFuture[String]): Unit = {
      val tableChange = input.getJSONArray("tableChanges").getJSONObject(0)
      val ddlType = tableChange.getString("type")
      if ("CREATE".equalsIgnoreCase(ddlType)) {
        CdcTools.main(Array[String](
          "mysql-sync-database",
          "--create-table-only",
          "--database", "test",
          // ... same MySQL and Doris parameters as above ...
        ))
      }
      resultFuture.complete(Collections.singleton("done"))
    }
  },
  1000, TimeUnit.MILLISECONDS, 1).setParallelism(1)

The main stream continues to write DML records to Doris using the standard sink.

4. Important Note

Doris requires an extra delete‑sign column for logical deletions. When forwarding DML rows, add the field "__DORIS_DELETE_SIGN__" with value 0; otherwise Doris will reject the record:

rowJson.put("__DORIS_DELETE_SIGN__", 0)

If the column is missing, Doris returns an error such as:

Reason: There is no column matching jsonpaths in the json file, columns:[id, name, __DORIS_DELETE_SIGN__], please check columns and jsonpaths: {"name":"X","id":5}

By leveraging the built‑in CdcTools and separating DDL from DML, the solution achieves real‑time synchronization of newly created MySQL tables to Doris with minimal code changes and good performance.

real-timeFlinkMySQLCDCScaladorisCdcTools
ITPUB
Written by

ITPUB

Official ITPUB account sharing technical insights, community news, and exciting events.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.