How to Sync MySQL ALTER DDL to Doris Using Flink CDC (Step‑by‑Step)
This guide explains how to extend a Flink CDC pipeline so that, in addition to real‑time data replication, DDL ALTER statements from MySQL are captured, split from the data stream, and applied to Doris using side‑outputs and a custom JDBC sink.
Implementation Overview
When using Flink CDC for full‑database synchronization, the pipeline can handle data changes easily, but real‑time DDL propagation—especially ALTER statements—requires extra handling.
Detailed Steps
Data Splitting
Flink first distinguishes between DDL records and regular row changes. The incoming JSON contains a historyRecord field for DDL events. By checking the presence of this field and the tableChanges array, the pipeline routes ALTER events to a side‑output while forwarding normal data to the main stream.
if (json.containsKey("historyRecord")) {
val histJson = json.getJSONObject("historyRecord")
if (histJson.containsKey("tableChanges") && !histJson.getJSONArray("tableChanges").isEmpty) {
val tableChange = histJson.getJSONArray("tableChanges").getJSONObject(0)
val ddlType = tableChange.getString("type") // ALTER/CREATE/DROP
if ("ALTER".equalsIgnoreCase(ddlType)) {
ctx.output(ddlSideOutput, histJson)
}
}
} else {
// main stream
out.collect(json)
}Processing the ALTER Stream
After extracting the side‑output, the pipeline extracts the raw ALTER SQL (e.g., alter table test_dist add column c03 int after addr) and executes it against Doris using a JDBC sink.
.addSink(new RichSinkFunction[JSONObject]() {
private val url = "jdbc:mysql://192.168.xxx.xxx:9030/test"
private val user = "user"
private val password = "****"
private var conn: Connection = null
override def open(parameters: Configuration): Unit = {
Class.forName("com.mysql.jdbc.Driver")
conn = DriverManager.getConnection(url, user, password)
}
override def invoke(value: JSONObject, context: SinkFunction.Context): Unit = {
val ddlSql = value.getString("ddl")
val ps = conn.prepareStatement(ddlSql)
ps.executeUpdate()
}
}).setParallelism(1)This approach assumes MySQL and Doris share compatible ALTER syntax for most common column types, allowing the raw SQL to be executed directly.
Handling the Main Data Stream
The primary data stream continues to use Doris's recommended write API (e.g., the Doris‑CDC sink) without modification.
Important Considerations
To ensure DDL events are captured, several CDC configuration flags must be enabled (e.g., include-schema-changes, snapshot-mode, etc.). Without these, Flink will ignore upstream DDL changes.
When syncing full‑database changes, a special “Doris delete flag” column is required in the target table to represent logical deletions.
Overall, CREATE DDL can be handled by the built‑in CdcTools utility, while ALTER statements require custom extraction, side‑output routing, and a JDBC sink to keep the downstream Doris schema in sync.
ITPUB
Official ITPUB account sharing technical insights, community news, and exciting events.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
