Big Data 8 min read

How to Sync MySQL ALTER DDL to Doris Using Flink CDC (Step‑by‑Step)

This guide explains how to extend a Flink CDC pipeline so that, in addition to real‑time data replication, DDL ALTER statements from MySQL are captured, split from the data stream, and applied to Doris using side‑outputs and a custom JDBC sink.

ITPUB
ITPUB
ITPUB
How to Sync MySQL ALTER DDL to Doris Using Flink CDC (Step‑by‑Step)

Implementation Overview

When using Flink CDC for full‑database synchronization, the pipeline can handle data changes easily, but real‑time DDL propagation—especially ALTER statements—requires extra handling.

Detailed Steps

Data Splitting

Flink first distinguishes between DDL records and regular row changes. The incoming JSON contains a historyRecord field for DDL events. By checking the presence of this field and the tableChanges array, the pipeline routes ALTER events to a side‑output while forwarding normal data to the main stream.

if (json.containsKey("historyRecord")) {
    val histJson = json.getJSONObject("historyRecord")
    if (histJson.containsKey("tableChanges") && !histJson.getJSONArray("tableChanges").isEmpty) {
        val tableChange = histJson.getJSONArray("tableChanges").getJSONObject(0)
        val ddlType = tableChange.getString("type") // ALTER/CREATE/DROP
        if ("ALTER".equalsIgnoreCase(ddlType)) {
            ctx.output(ddlSideOutput, histJson)
        }
    }
} else {
    // main stream
    out.collect(json)
}

Processing the ALTER Stream

After extracting the side‑output, the pipeline extracts the raw ALTER SQL (e.g., alter table test_dist add column c03 int after addr) and executes it against Doris using a JDBC sink.

.addSink(new RichSinkFunction[JSONObject]() {
    private val url = "jdbc:mysql://192.168.xxx.xxx:9030/test"
    private val user = "user"
    private val password = "****"
    private var conn: Connection = null

    override def open(parameters: Configuration): Unit = {
        Class.forName("com.mysql.jdbc.Driver")
        conn = DriverManager.getConnection(url, user, password)
    }

    override def invoke(value: JSONObject, context: SinkFunction.Context): Unit = {
        val ddlSql = value.getString("ddl")
        val ps = conn.prepareStatement(ddlSql)
        ps.executeUpdate()
    }
}).setParallelism(1)

This approach assumes MySQL and Doris share compatible ALTER syntax for most common column types, allowing the raw SQL to be executed directly.

Handling the Main Data Stream

The primary data stream continues to use Doris's recommended write API (e.g., the Doris‑CDC sink) without modification.

Important Considerations

To ensure DDL events are captured, several CDC configuration flags must be enabled (e.g., include-schema-changes, snapshot-mode, etc.). Without these, Flink will ignore upstream DDL changes.

When syncing full‑database changes, a special “Doris delete flag” column is required in the target table to represent logical deletions.

CDC configuration illustration
CDC configuration illustration

Overall, CREATE DDL can be handled by the built‑in CdcTools utility, while ALTER statements require custom extraction, side‑output routing, and a JDBC sink to keep the downstream Doris schema in sync.

Flink CDCDDL synchronization
ITPUB
Written by

ITPUB

Official ITPUB account sharing technical insights, community news, and exciting events.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.