Implementing Real-Time Data Ingestion with Delta Lake on EMR: Architecture, Challenges, and Solutions
This article describes how Soul's data engineering team replaced nightly batch ETL with real-time Delta Lake ingestion on EMR, detailing the motivations, comparative analysis of Delta, Hudi, Iceberg, the implementation architecture, encountered issues such as data skew and schema evolution, and the solutions adopted to improve performance and reliability.
Background
In the traditional offline data‑warehouse model, ETL is the first stage before logs are stored. Soul's event‑tracking logs generate massive data with dynamic daily partitions exceeding 1,200, and partition sizes ranging from tens of thousands to billions of rows. The previous pipeline ingested logs into Kafka, collected them with Flume to HDFS, and ran a day‑level Spark ETL job that loaded data into Hive, taking 2–3 hours (≈1 h for processing and >1 h for loading).
Problems included long day‑level ETL duration affecting downstream availability, heavy resource consumption during the nightly window, and unstable ETL jobs that required midnight troubleshooting.
Why Delta?
To reduce ETL latency and resource cost, the team switched from a T+1 batch ETL to a T+0 real‑time ingestion, aiming for immediate data availability while preserving consistency. Although a Lambda architecture with separate batch and real‑time stores was tried, it suffered from lack of transactional guarantees, small‑file pressure, and poor query performance.
The team evaluated three popular table‑format lake solutions—Delta Lake (open‑source and commercial), Apache Hudi, and Apache Iceberg—comparing their strengths and weaknesses. Delta offers Spark‑native streaming source support and SQL on Spark 3.0 but is tightly coupled to Spark and requires manual compaction. Hudi provides fast upserts via primary keys and automatic compaction but has a more complex API. Iceberg is engine‑agnostic but still maturing and has costly merge operations.
Alibaba Cloud contributed an EMR‑optimized Delta version with integrated SparkSQL/Streaming, automatic Hive metastore synchronization, auto‑compaction, and performance optimizations such as Z‑ordering and data skipping.
Implementation
During testing, several EMR Delta bugs were reported and resolved (e.g., missing Hive mapping tables, Tez engine incompatibilities, inconsistent results between Presto and Tez). The final real‑time ingestion architecture streams event logs from Kafka to Spark, writes them as Delta tables on HDFS every minute, and automatically creates Hive mapping tables, enabling queries via Hive MR, Tez, or Presto.
A generic ETL tool built on Spark was packaged to allow configuration‑driven ingestion without writing code. Additional features added include hidden‑partition creation similar to Iceberg, regular‑expression validation for dynamic partitions, custom event‑time fields, configurable nested‑JSON parsing depth, and SQL‑based dynamic partition configuration to mitigate data skew.
The platform was integrated into Soul’s data platform, allowing users to request log ingestion, obtain approval, and configure parameters through a UI, thereby simplifying real‑time Hive table creation.
To address the small‑file problem, EMR Delta’s OPTIMIZE and VACUUM commands were used for file merging and cleanup, with auto‑compaction policies triggered based on file count thresholds.
Issues & Solutions
Data skew from uneven dynamic partitions: Repartitioning by partition column reduced the number of files per batch but caused a few large partitions. A custom SQL‑driven repartition‑by‑salt approach broke large partitions into multiple files, cutting the slowest partition runtime from 3 minutes to 40 seconds.
Dynamic schema evolution: A metadata service was introduced to detect new fields during DataFrame construction, update the metadata, and rebuild the schema, enabling automatic addition of columns to Delta tables and synchronized Hive mappings.
Kafka offset commit timing leading to duplicate data: The team switched to Structured Streaming with exactly‑once support for Delta, or managed offsets externally to avoid re‑processing.
Metadata parsing overhead for queries: Alibaba Cloud optimized query paths with caching to reduce metadata read latency as Delta tables grow.
Potential CDC use case: Although Delta supports updates and deletes, the current workload’s high‑frequency, wide‑partition updates make merge‑based CDC performance a concern; ongoing work on join pruning and Bloom filters aims to address this.
Future Plans
1. Extend Delta Lake to further optimize the real‑time data‑warehouse architecture and improve latency for key business metrics.
2. Integrate the internal metadata platform to provide end‑to‑end lineage and governance for log ingestion, real‑time storage, and metadata management.
3. Continue performance tuning of Delta queries, exploring advanced features such as Z‑ordering to accelerate ad‑hoc analysis.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
