Big Data 19 min read

Enterprise Data Lake Architecture, Delta Lake Core Capabilities, and Stream‑Batch Integrated Analytics on Alibaba Cloud

This article explains the rapid growth of data, the limitations of traditional warehouses, and how a cloud‑based data lake built on object storage with Delta Lake format provides low‑cost, flexible, and ACID‑compliant analytics, followed by a step‑by‑step guide to ingest, manage, and analyze data using Alibaba Cloud DLF and Databricks DDI with Spark streaming and batch jobs.

Big Data Technology Architecture
Big Data Technology Architecture
Big Data Technology Architecture
Enterprise Data Lake Architecture, Delta Lake Core Capabilities, and Stream‑Batch Integrated Analytics on Alibaba Cloud

Background

With the continuous expansion of the data era, data volume has exploded and data formats have become increasingly diverse. Traditional data warehouse models suffer from high cost, slow response, and limited formats, prompting the emergence of data lakes that offer lower cost, richer data forms, and more flexible analytical computing.

Data Lake Architecture and Key Technologies

A data lake is a centralized storage repository that supports structured, semi‑structured, and unstructured data from sources such as relational databases, log systems, NoSQL stores, and message queues. It stores data in cost‑effective object storage (e.g., OSS) and provides a unified data catalog for multiple compute engines, solving data silos and reducing storage and usage costs.

Storage and Formats

The lake primarily uses cloud object storage for its low cost, high stability, and scalability. ACID‑compatible lake formats such as Delta Lake, Apache Hudi, and Iceberg provide metadata management, support UPDATE/DELETE, and enable real‑time data updates in batch‑stream integrated scenarios. The article focuses on Delta Lake.

Core Capabilities of Delta Lake

ACID Transactions: Each write is a transaction recorded in a transaction log with optimistic concurrency control, providing serializable isolation and snapshot reads.

Schema Management: Automatic schema validation, DDL for adding columns, and automatic schema evolution.

Scalable Metadata: Metadata stored in the transaction log enables fast directory listing and efficient reads.

Time Travel: Historical snapshots can be accessed by timestamp or version, facilitating reproducibility and rollback.

Unified Batch‑Stream: Delta Lake can serve as a streaming sink for Structured Streaming, supporting near‑real‑time analytics without complex pipelines.

Upserts and Deletes: DML commands (MERGE, UPDATE, DELETE) allow efficient row‑level changes without rewriting whole partitions.

Data Lake Construction and Management

1. Data Ingestion

Enterprise data resides in MySQL, SLS logs, HBase, Kafka, etc. Batch tools (DataX, Sqoop) and real‑time pipelines (Kafka + Spark Streaming/Flink) are used to sync data to low‑cost object storage. Alibaba Cloud DLF provides a one‑stop ingestion solution.

2. Unified Metadata Service

Object storage lacks analytical semantics, so a Hive Metastore‑like service provides unified metadata for various compute engines, supporting automatic metadata extraction, unified permissions, and compatibility with open‑source ecosystems.

Data Lake Compute and Analysis

Unlike traditional warehouses, a data lake can connect to multiple engines (Hive, Spark, Presto, Flink, MaxCompute, Hologres). Acceleration services improve performance and reduce bandwidth costs.

Databricks Data Insight (DDI) – Commercial Spark Engine

DDI is a fully managed Spark analysis engine on Alibaba Cloud that offers SaaS‑style operation, full Spark stack integration, cost reduction through storage‑compute separation, and 24/7 SLA support.

Practical Workflow: DDI + DLF Stream‑Batch Integrated Analysis

Prerequisites

Enable DLF, OSS, Kafka, DDI, RDS, DTS services in the same region.

Create a MySQL database dlfdb and a user with read/write privileges.

1. Create Data Source

In the DLF console, navigate to Data Ingestion → Data Source Management and add a new source pointing to the RDS instance.

2. Create Metadata Database

Create an OSS bucket (e.g., databricks-data-source) and a metadata database dlf/ via the Metadata Management UI.

3. Create Ingestion Task

Configure a real‑time relational database ingestion task, select the newly created source, set the target path dlf/engine_funcs, and enable DTS subscription.

4. Create Databricks DDI Cluster

After cluster creation, add the client IP to the whitelist, then open a Notebook for interactive queries on the Delta tables.

5. Streaming Job (Spark Structured Streaming)

%spark.conf
spark.jars.packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1

%spark
import org.apache.spark.sql._
import io.delta.tables._

def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  microBatchOutputDF.createOrReplaceTempView("dataStream")
  val df = microBatchOutputDF.sparkSession.sql("""
    select `sn`,
      stack(7, 'temperature', `temperature`, 'speed', `speed`, 'runTime', `runTime`, 'pressure', `pressure`, 'electricity', `electricity`, 'flow', `flow`, 'dia', `dia`) as (`name`, `value`)
    from dataStream
  """)
  df.createOrReplaceTempView("updates")
  microBatchOutputDF.sparkSession.sql("""
    MERGE INTO delta_aggregates_metrics t USING updates s
    ON s.sn = t.sn AND s.name = t.name
    WHEN MATCHED THEN UPDATE SET t.value = s.value, t.update_time = current_timestamp()
    WHEN NOT MATCHED THEN INSERT (sn, name, value, create_time, update_time) VALUES (s.sn, s.name, s.value, current_timestamp(), current_timestamp())
  """)
}

%spark
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger

def getquery(checkpoint_dir: String, servers: String, topic: String) = {
  val streamingInputDF = spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", servers)
    .option("subscribe", topic)
    .option("startingOffsets", "latest")
    .option("minPartitions", "10")
    .option("failOnDataLoss", "true")
    .load()

  val streamingSelectDF = streamingInputDF.select(
    get_json_object(col("value").cast("string"), "$.sn").alias("sn"),
    get_json_object(col("value").cast("string"), "$.temperature").alias("temperature"),
    get_json_object(col("value").cast("string"), "$.speed").alias("speed"),
    get_json_object(col("value").cast("string"), "$.runTime").alias("runTime"),
    get_json_object(col("value").cast("string"), "$.electricity").alias("electricity"),
    get_json_object(col("value").cast("string"), "$.flow").alias("flow"),
    get_json_object(col("value").cast("string"), "$.dia").alias("dia"),
    get_json_object(col("value").cast("string"), "$.pressure").alias("pressure")
  )

  streamingSelectDF.writeStream.format("delta")
    .option("checkpointLocation", checkpoint_dir)
    .trigger(Trigger.ProcessingTime("5 seconds"))
    .foreachBatch(upsertToDelta _)
    .outputMode("update")
    .start()
}

%spark
val my_checkpoint_dir = "oss://databricks-data-source/checkpoint/ck"
val servers = "***.***.***.***:9092"
val topic = "your-topic"
getquery(my_checkpoint_dir, servers, topic)

6. Batch Job

%spark
// Read real‑time updated tables
val aggregateDF = spark.table("log_data_warehouse_dlf.delta_aggregates_metrics")
val rdsData = spark.table("fjl_dlf.engine_funcs_delta").drop("create_time", "update_time")

val aggS = aggregateDF.withColumnRenamed("value", "esn_value")
  .withColumnRenamed("name", "engine_serial_name")
  .withColumnRenamed("sn", "engine_serial_number")

val aggT = aggregateDF.withColumnRenamed("value", "tesn_value")
  .withColumnRenamed("name", "target_engine_serial_name")
  .withColumnRenamed("sn", "target_engine_serial_number")
  .drop("create_time", "update_time")

val resultDF = rdsData.join(aggS, Seq("engine_serial_name", "engine_serial_number"), "left")
  .join(aggT, Seq("target_engine_serial_number", "target_engine_serial_name"), "left")
  .selectExpr("engine_serial_number", "engine_serial_name", "esn_value", "target_engine_serial_number", "target_engine_serial_name", "tesn_value", "operator", "create_time", "update_time")

resultDF.show(false)
resultDF.write.format("delta").mode("append").saveAsTable("log_data_warehouse_dlf.delta_result")

7. Performance Optimization

Streaming generates many small files that degrade read performance. Use Delta Lake's OPTIMIZE to compact files and Z‑ORDER BY to improve query speed. Schedule hourly OPTIMIZE and VACUUM to clean up old files:

OPTIMIZE log_data_warehouse_dlf.delta_result ZORDER BY engine_serial_number;
VACUUM log_data_warehouse_dlf.delta_result RETAIN 1 HOURS;

References

Delta Lake Fundamentals and Performance (PDF)

Databricks blog on processing petabytes in seconds

Alibaba Cloud documentation: https://help.aliyun.com/document_detail/148379.html

Alibaba CloudSparkDelta Lake
Big Data Technology Architecture
Written by

Big Data Technology Architecture

Exploring Open Source Big Data and AI Technologies

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.