Understanding Flink CDC 2.0: Core Design, Chunk Splitting, and Code Walkthrough
This article introduces Flink CDC 2.0, explains its core design—including chunk splitting for full‑load and incremental reads—provides a Flink SQL example, and walks through the MySQL CDC source implementation with detailed code snippets and processing logic.
Introduction : In August, Flink CDC released version 2.0.0, adding distributed full‑load reading, checkpoint support, and consistent data synchronization without table locks. This article focuses on the processing logic of Flink CDC 2.0, while briefly mentioning FLIP‑27 (Refactor Source Interface) and Debezium APIs.
Example Case : A Flink SQL job reads a MySQL table using the CDC connector, writes the changelog‑json format to Kafka, and observes RowKind types and record counts.
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
env.setParallelism(3);
// note: incremental sync requires checkpointing
env.enableCheckpointing(10000);
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, envSettings);
tableEnvironment.executeSql(" CREATE TABLE demoOrders (\n"
+ " `order_id` INTEGER ,\n"
+ " `order_date` DATE ,\n"
+ " `order_time` TIMESTAMP(3),\n"
+ " `quantity` INT ,\n"
+ " `product_id` INT ,\n"
+ " `purchaser` STRING,\n"
+ " primary key(order_id) NOT ENFORCED" +
" ) WITH (\n"
+ " 'connector' = 'mysql-cdc',\n"
+ " 'hostname' = 'localhost',\n"
+ " 'port' = '3306',\n"
+ " 'username' = 'cdc',\n"
+ " 'password' = '123456',\n"
+ " 'database-name' = 'test',\n"
+ " 'table-name' = 'demo_orders'," +
" 'scan.startup.mode' = 'initial' " +
" )");
tableEnvironment.executeSql("CREATE TABLE sink (\n"
+ " `order_id` INTEGER ,\n"
+ " `order_date` DATE ,\n"
+ " `order_time` TIMESTAMP(3),\n"
+ " `quantity` INT ,\n"
+ " `product_id` INT ,\n"
+ " `purchaser` STRING,\n"
+ " primary key (order_id) NOT ENFORCED " +
") WITH (\n"
+ " 'connector' = 'kafka',\n"
+ " 'properties.bootstrap.servers' = 'localhost:9092',\n"
+ " 'topic' = 'mqTest02',\n"
+ " 'format' = 'changelog-json' " +
")");
tableEnvironment.executeSql("insert into sink select * from demoOrders");
}The full‑load output (sample JSON records) shows +I insert operations for each row.
{"data":{"order_id":1010,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:12.189","quantity":53,"product_id":502,"purchaser":"flink"},"op":"+I"}
... (other records omitted for brevity) ...After modifying the source table, incremental changes are captured as -U (update delete) and +U (update insert) events, and deletions appear as -D .
## Update order 1005
{"data":{"order_id":1005,"order_date":"2021-09-17","order_time":"2021-09-22 02:51:58.813","quantity":69,"product_id":503,"purchaser":"flink"},"op":"-U"}
{"data":{"order_id":1005,"order_date":"2021-09-17","order_time":"2021-09-22 02:55:43.627","quantity":80,"product_id":503,"purchaser":"flink"},"op":"+U"}
## Delete order 1000
{"data":{"order_id":1000,"order_date":"2021-09-17","order_time":"2021-09-17 09:40:32.354","quantity":30,"product_id":500,"purchaser":"flink"},"op":"-D"}Core Design – Chunk Splitting : During the full‑load phase, the table is divided into chunks based on the primary key. If the primary key is an auto‑increment integer, chunks are evenly distributed; otherwise, non‑uniform chunks are created by repeatedly querying the next maximum key.
// Compute primary‑key range
SELECT MIN(`order_id`), MAX(`order_id`) FROM demo_orders;
// Evenly distributed chunks
chunk‑0: [min, start + chunkSize)
chunk‑1: [start + chunkSize, start + 2*chunkSize)
...
chunk‑last: [max, null)For non‑uniform distribution, the maximum value of the next chunkSize rows is queried to define the chunk end.
// Get max id of the next chunkSize rows
SELECT MAX(`order_id`) FROM (
SELECT `order_id` FROM `demo_orders`
WHERE `order_id` >= [previousChunkEnd]
ORDER BY `order_id` ASC
LIMIT [chunkSize]
) AS T;Full‑Load Snapshot Reading : Flink executes a JDBC query for each chunk, records the binlog offset before and after the snapshot, and then corrects the snapshot data with binlog events to guarantee consistency.
SELECT * FROM `test`.`demo_orders`
WHERE order_id >= [chunkStart]
AND order_id < [chunkEnd];Data correction uses the binlog offset obtained via SHOW MASTER STATUS . If no binlog events are found, the snapshot is emitted directly; otherwise, delete, insert, and update events are applied to the snapshot in memory.
Incremental Reading : After all snapshot chunks are processed, the SplitEnumerator creates a BinlogSplit . The starting offset for the incremental stream is the smallest high‑watermark among completed snapshot splits. Only binlog events whose offset is greater than the corresponding snapshot high‑watermark are emitted.
// Determine whether to emit a binlog record
if (position.isAtOrBefore(maxSplitHighWatermarkMap.get(tableId))) {
return true; // emit
}
// otherwise check if the record belongs to a finished snapshot split
if (RecordUtils.splitKeyRangeContains(key, splitInfo.getSplitStart(), splitInfo.getSplitEnd()) &&
position.isAtOrBefore(splitInfo.getHighWatermark())) {
return true;
}
return false;Source Enumerator & Reader Initialization : The MySqlSourceEnumerator creates a MySqlSourceReader for each parallel subtask. The enumerator validates MySQL version (≥5.7), binlog_format=ROW , and binlog_row_image=FULL , then uses MySqlHybridSplitAssigner to generate snapshot and binlog splits.
private void syncWithReaders(int[] subtaskIds, Throwable t) {
if (t != null) {
throw new FlinkRuntimeException("Failed to list obtain registered readers due to:", t);
}
if (splitAssigner.waitingForFinishedSplits()) {
for (int subtaskId : subtaskIds) {
context.sendEventToSourceReader(subtaskId, new FinishedSnapshotSplitsRequestEvent());
}
}
}The reader creates a SplitFetcher thread that adds assigned splits to a task queue, then fetches data via Debezium APIs. Snapshot splits produce SnapshotSplitReader tasks; binlog splits use BinlogSplitReader .
public void addSplits(List
splitsToAdd) {
SplitFetcher
fetcher = getRunningFetcher();
if (fetcher == null) {
fetcher = createSplitFetcher();
fetcher.addSplits(splitsToAdd);
startFetcher(fetcher);
} else {
fetcher.addSplits(splitsToAdd);
}
}Record Emission : MySqlRecordEmitter converts Debezium SourceRecord objects to Flink RowData (INSERT, DELETE, UPDATE_BEFORE, UPDATE_AFTER) using RowDataDebeziumDeserializeSchema . Watermark events are used to track high‑watermarks for each split.
public void deserialize(SourceRecord record, Collector
out) throws Exception {
Envelope.Operation op = Envelope.operationFor(record);
Struct value = (Struct) record.value();
if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
GenericRowData insert = extractAfterRow(value, record.valueSchema());
insert.setRowKind(RowKind.INSERT);
out.collect(insert);
} else if (op == Envelope.Operation.DELETE) {
GenericRowData delete = extractBeforeRow(value, record.valueSchema());
delete.setRowKind(RowKind.DELETE);
out.collect(delete);
} else {
GenericRowData before = extractBeforeRow(value, record.valueSchema());
before.setRowKind(RowKind.UPDATE_BEFORE);
out.collect(before);
GenericRowData after = extractAfterRow(value, record.valueSchema());
after.setRowKind(RowKind.UPDATE_AFTER);
out.collect(after);
}
}When a snapshot split finishes, the reader reports its high‑watermark and split ID to the enumerator, which aggregates this information to compute the starting offset for the subsequent binlog split.
private MySqlBinlogSplit createBinlogSplit() {
List
assignedSnapshotSplit = snapshotSplitAssigner.getAssignedSplits().values()
.stream().sorted(Comparator.comparing(MySqlSplit::splitId)).collect(Collectors.toList());
Map
splitFinishedOffsets = snapshotSplitAssigner.getSplitFinishedOffsets();
BinlogOffset minBinlogOffset = null;
for (MySqlSnapshotSplit split : assignedSnapshotSplit) {
BinlogOffset offset = splitFinishedOffsets.get(split.splitId());
if (minBinlogOffset == null || offset.compareTo(minBinlogOffset) < 0) {
minBinlogOffset = offset;
}
}
return new MySqlBinlogSplit(BINLOG_SPLIT_ID, lastSnapshotSplit.getSplitKeyType(),
minBinlogOffset == null ? BinlogOffset.INITIAL_OFFSET : minBinlogOffset,
BinlogOffset.NO_STOPPING_OFFSET, finishedSnapshotSplitInfos, tableSchemas);
}Overall, Flink CDC 2.0 introduces a robust architecture for distributed full‑load reading, precise checkpointing, and consistent incremental synchronization, making it suitable for large‑scale change data capture scenarios.
TAL Education Technology
TAL Education is a technology-driven education company committed to the mission of 'making education better through love and technology'. The TAL technology team has always been dedicated to educational technology research and innovation. This is the external platform of the TAL technology team, sharing weekly curated technical articles and recruitment information.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.