Real‑time Data Warehouse Architecture and Technical Solution at Liulishuo
This article describes Liulishuo's migration to a Flink‑based real‑time data warehouse, covering background, benefits, technology selection (storage, Flink platform, dimension table connectors), overall architecture, concrete Hudi and Elasticsearch ingestion examples, processing SQL, and future outlook for unified batch‑streaming storage.
Background
Liulishuo initially built a real‑time data warehouse on Flink 1.9, using YARN for resource management and custom SQL‑Client modifications. With Flink 1.13 and mature SQL connectors, the platform was upgraded to Flink on Kubernetes, prompting a cascade of component upgrades.
What a real‑time data warehouse brings
Data synchronization
Second‑level real‑time processing
Unified lake‑on‑top architecture for real‑time and batch
Technical Solution Selection
Data storage scheme
Among Delta, Hudi and Iceberg, Hudi and Iceberg were compared because Flink is the compute engine. The comparison table shows cloud support, small‑file handling, stability, SQL support, update/delete capabilities, engine compatibility and language support.
Comparison Dimension
Hudi
Iceberg
Cloud vendor support
Alibaba Cloud, AWS
Alibaba Cloud
Small file merging
Automatic
V1 needs manual handling; V2 supports upsert but has merge issues
Stability
Stable
Stable
SQL support
Supported
Supported
Update/delete support
Supported
V2 supports upsert
Engine compatibility
Flink, Spark, Trino, Presto
Flink, Spark, Trino, Presto
Language support
Java, Scala, Python, SQL
Java, Scala, Python, SQL
Because Iceberg V1 lacks upsert and V2 has small‑file merge problems, Hudi was selected as the lake storage solution.
Flink development platform
Aliyun EMR’s Hadoop stack incurred high operational cost. After evaluation, Alibaba Cloud Real‑time Computing Flink (VVP) was chosen for its Kubernetes‑based deployment, session/application modes, elastic scaling, interactive SQL UI, high availability, metadata management with Hive Metastore, custom connector and UDF support, OSS checkpointing, Prometheus monitoring, and multi‑version task handling.
Dimension table storage
Supported connectors include JDBC, HBase, and Elasticsearch. Their capabilities and drawbacks are summarized below.
Name
Version
Source
Sink
JDBC
Bounded Scan, Lookup
Streaming Sink, Batch Sink
HBase
1.4.x & 2.2.x
Bounded Scan, Lookup
Streaming Sink, Batch Sink
Elasticsearch
6.x & 7.x
Not supported
Streaming Sink, Batch Sink
JDBC offers direct DB lookup without data sync but can stress the source under peak load. HBase provides high QPS via RowKey but only supports key‑based joins. Elasticsearch enables flexible queries and high QPS, but it is not officially supported as a dimension table and can be slower.
Connector development
A custom Flink‑Elasticsearch connector was built to support both source (scan, lookup) and sink (streaming, batch) operations.
Overall Real‑time Warehouse Architecture
The architecture stores raw streams in Kafka and OSS (Hudi tables). VVP provides unified development, operation, and resource management. Flink handles real‑time processing, Trino enables interactive analytics, Superset generates reports, and Hive Metastore maintains metadata. (Architecture diagram omitted.)
Data Development Cases
Data Ingestion
CDC streams MySQL binlog to Kafka as Debezium JSON.
Hudi table ingestion
Using Flink SQL to create and populate Hudi tables. The DDL and DML statements are shown below.
-- Hudi table DDL statement
CREATE TABLE IF NOT EXISTS `catalog`.`db_name`.`table_name` (
id BIGINT,
xxx STRING,
`data_date` VARCHAR(20),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'table.type' = 'COPY_ON_WRITE',
'write.tasks' = '2',
'index.global.enabled' = 'true',
'index.bootstrap.enabled' = 'true',
'read.streaming.enabled' = 'true',
'oss.endpoint' = 'xxx',
'accessKeyId' = 'xxx',
'accessKeySecret' = 'xxx',
'path' = 'oss://xxx',
'hive_sync.enable' = 'true',
'hive_sync.mode' = 'hms',
'hive_sync.db' = 'hudi',
'hive_sync.metastore.uris' = 'xxx',
'hive_sync.table' = 'xxx',
'compaction.tasks' = '1',
'hoodie.cleaner.commits.retained' = '10',
'hoodie.datasource.write.precombine.field' = 'updated_at',
'hoodie.datasource.write.recordkey.field' = 'id'
);
-- DML statement
INSERT INTO `catalog`.`db_name`.`table_name`
SELECT id, xxx, `data_date`
FROM kafka_streaming_table
WHERE id IS NOT NULL;Key points: enable bootstrap for initial uniqueness, define PRIMARY KEY for upserts, set pre‑combine field, configure Hive sync parameters, and retain a limited number of commit versions for cleanup.
Elasticsearch dimension table
The custom connector is packaged and registered on the Alibaba Cloud real‑time platform, then used to create and populate ES tables via Flink SQL.
-- ES table DDL
DROP TABLE IF EXISTS es_dimension01;
CREATE TABLE IF NOT EXISTS es_dimension01 (
id STRING,
xxx STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch6-lls',
'format' = 'json',
'hosts' = 'http://xxx:9200',
'index' = 'es_dimension01',
'username' = 'xxx',
'password' = 'xxx'
);
-- DML: read from Kafka and write to Elasticsearch
INSERT INTO es_dimension01
SELECT id, xxx FROM kafka_xxx_dimension;The ES connector supports upsert by mapping the PRIMARY KEY to the document ID, allowing updates and deletions through ID‑based operations.
Data Processing
A labeling job consumes Kafka streams, performs lookup joins with multiple ES dimension tables, uses an interval join for a fast‑changing dimension, and writes enriched records back to a Kafka sink. The full INSERT…SELECT statement is shown below.
INSERT INTO kafka_sink
SELECT
CAST(a.id AS BIGINT) AS id,
'lable0' AS lable,
CASE WHEN f.id IS NOT NULL THEN f.start_sec * 1000 ELSE e.start_sec * 1000 END AS sourceTime,
CASE WHEN f.id IS NOT NULL THEN f.end_sec * 1000 ELSE e.end_sec * 1000 END AS expiredAt,
a.updated_at AS updated,
'dmp' AS type
FROM kafka_stream_source a
JOIN es_llspay_order_items FOR SYSTEM_TIME AS OF a.proctime AS b ON a.id = b.id
JOIN es_dimension01 FOR SYSTEM_TIME AS OF a.proctime AS c ON b.id = c.id
JOIN es_dimension02 FOR SYSTEM_TIME AS OF a.proctime AS d ON a.id = d.id LEFT
JOIN es_dimension03 FOR SYSTEM_TIME AS OF a.proctime AS e ON e.id = d.id LEFT
JOIN kafka_stream_dimension01 f ON CAST(f.id AS STRING) = d.id
AND a.proctime BETWEEN f.proctime - INTERVAL '1' SECOND AND f.proctime + INTERVAL '5' SECOND
WHERE c.name LIKE '%label1%' OR c.name LIKE '%label2%'
AND (e.id IS NOT NULL OR f.id IS NOT NULL);This job achieves second‑level latency while ensuring data accuracy through careful join ordering and fallback strategies.
Outlook
The current Lambda architecture combines real‑time Flink pipelines with offline Spark 2.4.5 jobs. After migrating to Spark 3.x, Liulishuo expects Hudi to continue serving both batch and streaming storage, further unifying its data platform.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
