Design and Implementation of CDC‑Based Real‑Time Data Ingestion with Delta Lake on Alibaba Cloud EMR
This article describes how FluentSpeak replaced a DataX master‑slave pipeline with a CDC‑plus‑Delta Lake solution on Alibaba Cloud EMR, detailing architecture choices, streaming SQL merge logic, monitoring, challenges, and the resulting cost and latency improvements.
Background
FluentSpeak's offline computation tasks rely on business databases for data sources; the accuracy, stability, and timeliness of DB ingestion determine the downstream pipeline's correctness. Previously, DataX wrapped in a master‑slave architecture handled DB ingestion, and a read‑only replica with a Presto connector provided near‑real‑time queries.
Technical Solution Selection
Various CDC‑based ingestion options were evaluated: CDC+Merge, CDC+Hudi, CDC+Delta Lake, and CDC+Iceberg. CDC+Merge was discarded because it could not satisfy near‑real‑time query needs, and Iceberg was deemed immature. The remaining candidates, CDC+Hudi and CDC+Delta Lake, offered similar functionality; selection criteria included stability, small‑file handling, SQL support, cloud‑vendor support, and language support.
Consideration
Hudi
Delta Lake
Stability
Stable
Stable
Small‑file merging
Automatic
Manual
SQL support
Not supported
Supported
Cloud vendor support
Alibaba Cloud not supported at selection time; AWS supported with custom development
Alibaba Cloud supported and integrated into EMR, with StreamingSQL support
Language support
Java, Scala
Java, Scala, Python, SQL
Because the whole data platform is built on Alibaba Cloud EMR, Delta Lake required far less integration effort, so the CDC+Delta Lake solution was chosen.
Overall Architecture
Data is divided into historical data and new data. Historical data is imported once via DataX from MySQL into OSS. New data is captured in near real‑time using MySQL binlog, Debezium, and written to Delta Lake tables. Each night, before the ETL job runs, a Merge operation combines historical and new data.
Delta Lake Data Ingestion
Debezium captures MySQL binlog and publishes each table to a Kafka topic (one topic per table). Kafka source tables are created with the following DDL:
CREATE TABLE `kafka_{db_name}_{table_name}` (`key` BINARY, `value` BINARY, `topic` STRING, `partition` INT, `offset` BIGINT, `timestamp` TIMESTAMP, `timestampType` INT)
USING kafka
OPTIONS (
`kafka.sasl.mechanism` 'PLAIN',
`subscribe` 'cdc-{db_name}-{table_name}',
`serialization.format` '1',
`kafka.sasl.jaas.config` '*********(redacted)',
`kafka.bootstrap.servers` '{bootstrap-servers}',
`kafka.security.protocol` 'SASL_PLAINTEXT'
);The value field contains JSON with before/after payloads, operation type, and timestamp:
{
"payload": {
"before": { ... },
"after": { ... },
"source": { ... },
"op": "c",
"ts_ms": ...
}
}Corresponding Delta Lake tables are created as follows:
CREATE TABLE IF NOT EXISTS delta.delta_{db_name}_{table_name}(
{row_key_info},
ts_ms bigint,
json_record string,
operation_type string,
offset bigint
) USING delta
LOCATION '------/delta/{db_name}.db/{table_name}'StreamingSQL reads the Kafka source, extracts offset , value , and CDC fields, and performs a 5‑minute mini‑batch MERGE into the Delta Lake table. The MERGE logic handles inserts, updates, and deletes based on the operation type.
CREATE SCAN incremental_{db_name}_{table_name} ON kafka_{db_name}_{table_name} USING STREAM
OPTIONS(
startingOffsets='earliest',
maxOffsetsPerTrigger='1000000',
failOnDataLoss=false
);
CREATE STREAM job
OPTIONS(
checkpointLocation='------/delta/{db_name}.db/{table_name}_checkpoint',
triggerIntervalMs='300000'
);
MERGE INTO delta.delta_{db_name}_{table_name} AS target
USING (
SELECT * FROM (
SELECT ts_ms, offset, operation_type, {key_column_sql},
COALESCE(after_record, before_record) AS after_record,
ROW_NUMBER() OVER (PARTITION BY {key_column_partition_sql} ORDER BY ts_ms DESC, offset DESC) AS rank
FROM (
SELECT ts_ms, offset, operation_type, before_record, after_record, {key_column_include_sql}
FROM (
SELECT get_json_object(string(value), '$.payload.op') AS operation_type,
get_json_object(string(value), '$.payload.before') AS before_record,
get_json_object(string(value), '$.payload.after') AS after_record,
get_json_object(string(value), '$.payload.ts_ms') AS ts_ms,
offset
FROM incremental_{db_name}_{table_name}
) binlog
) binlog_wo_init
) binlog_rank
WHERE rank = 1
) AS source
ON {key_column_condition_sql}
WHEN MATCHED AND (source.operation_type = 'u' OR source.operation_type = 'd') THEN
UPDATE SET {set_key_column_sql}, ts_ms = source.ts_ms, json_record = source.after_record,
operation_type = source.operation_type, offset = source.offset
WHEN NOT MATCHED AND (source.operation_type = 'c' OR source.operation_type = 'u' OR source.operation_type = 'd') THEN
INSERT ({inser_key_column_sql}, ts_ms, json_record, operation_type, offset)
VALUES ({insert_key_column_value_sql}, source.ts_ms, source.after_record, source.operation_type, source.offset);After the StreamingSQL job, Delta Lake stores data as Parquet files (e.g., part‑xxxx.snappy.parquet ) and transaction logs under _delta_log . Because Delta Lake does not automatically compact small files, an hourly OPTIMIZE and VACUUM are run to merge files and purge versions older than two hours.
optimize delta_{db_name}_{table_name};
set spark.databricks.delta.retentionDurationCheck.enabled = false;
VACUUM delta_{db_name}_{table_name} RETAIN 1 HOURS;Hive and Presto cannot directly read Spark‑created Delta tables, so external Hive/Presto tables are generated to point to the same data locations.
Delta Lake Data & Historical Data Merge
Each night a merge task combines CDC data with historical DataX data. The task metadata (task name, DB name, table name, Hive table name, etc.) is stored in a MySQL table, and an Airflow DAG is automatically generated to run the Spark‑SQL merge script.
CREATE DATABASE IF NOT EXISTS {db_name} LOCATION '------/delta/{db_name}.db';
DROP TABLE IF EXISTS `{db_name}`.`{table_name}`;
CREATE TABLE IF NOT EXISTS `{db_name}`.`{table_name}`(
{table_column_infos}
) STORED AS PARQUET
LOCATION '------/delta/{db_name}.db/{table_name}/data_date=${{data_date}}';
INSERT OVERWRITE TABLE `{db_name}`.`{table_name}`
SELECT {table_columns}
FROM (
SELECT {table_columns}, _operation_type,
ROW_NUMBER() OVER (PARTITION BY {row_keys} ORDER BY ts_ms DESC) AS rank_num_
FROM (
SELECT {delta_columns}, operation_type AS _operation_type, ts_ms
FROM delta_{db_name}_{table_name}
UNION ALL
SELECT {hive_columns}, 'c' AS _operation_type, 0 AS ts_ms
FROM `{db_name}`.`{table_name}_delta_history`
) union_rank
) ranked_data
WHERE rank_num_ = 1
AND _operation_type <> 'd';Airflow schedules the merge; upon successful completion downstream ETL jobs are triggered.
Monitoring
Two monitoring jobs ensure data freshness and completeness. One monitors Kafka offset vs. Delta Lake primary key every 15 minutes, inserting the latest row_key into a MySQL monitor table and alerting if the corresponding Delta record is missing. The other compares MySQL max IDs or row counts with Delta Lake counts, using either a probe approach (max ID) or a count‑based approach, and raises alerts on mismatches.
Challenges
Frequent schema changes in source tables make CDC field parsing fragile; parsing is deferred to the nightly merge to avoid costly schema‑driven repairs.
Growing Delta Lake size degrades StreamingSQL performance; the current mitigation is to replace the Delta data with the latest merged snapshot and restart consumption, reducing the amount of data processed.
Hive and Presto cannot directly query Spark‑created Delta tables, requiring separate external tables and preventing seamless engine‑agnostic ETL.
Benefits
DB replica cost reduced by approximately 80% after adopting CDC + Delta Lake.
Nightly DB ingestion time dramatically shortened, allowing most DB loads to finish within one hour.
Future Plans
Address StreamingSQL performance degradation as Delta Lake data volume continues to grow.
Enable Hive and Presto to query Spark‑created Delta tables directly, eliminating the need for duplicate external tables.
Liulishuo Tech Team
Help everyone become a global citizen!
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.