Big Data 15 min read

Real‑time Data Warehouse Architecture and Technical Solution at Liulishuo

This article describes Liulishuo's migration to a Flink‑based real‑time data warehouse, covering background, benefits, technology selection (storage, Flink platform, dimension table connectors), overall architecture, concrete Hudi and Elasticsearch ingestion examples, processing SQL, and future outlook for unified batch‑streaming storage.

Liulishuo Tech Team
Liulishuo Tech Team
Liulishuo Tech Team
Real‑time Data Warehouse Architecture and Technical Solution at Liulishuo

Background

Liulishuo initially built a real‑time data warehouse on Flink 1.9, using YARN for resource management and custom SQL‑Client modifications. With Flink 1.13 and mature SQL connectors, the platform was upgraded to Flink on Kubernetes, prompting a cascade of component upgrades.

What a real‑time data warehouse brings

Data synchronization

Second‑level real‑time processing

Unified lake‑on‑top architecture for real‑time and batch

Technical Solution Selection

Data storage scheme

Among Delta, Hudi and Iceberg, Hudi and Iceberg were compared because Flink is the compute engine. The comparison table shows cloud support, small‑file handling, stability, SQL support, update/delete capabilities, engine compatibility and language support.

Comparison Dimension

Hudi

Iceberg

Cloud vendor support

Alibaba Cloud, AWS

Alibaba Cloud

Small file merging

Automatic

V1 needs manual handling; V2 supports upsert but has merge issues

Stability

Stable

Stable

SQL support

Supported

Supported

Update/delete support

Supported

V2 supports upsert

Engine compatibility

Flink, Spark, Trino, Presto

Flink, Spark, Trino, Presto

Language support

Java, Scala, Python, SQL

Java, Scala, Python, SQL

Because Iceberg V1 lacks upsert and V2 has small‑file merge problems, Hudi was selected as the lake storage solution.

Flink development platform

Aliyun EMR’s Hadoop stack incurred high operational cost. After evaluation, Alibaba Cloud Real‑time Computing Flink (VVP) was chosen for its Kubernetes‑based deployment, session/application modes, elastic scaling, interactive SQL UI, high availability, metadata management with Hive Metastore, custom connector and UDF support, OSS checkpointing, Prometheus monitoring, and multi‑version task handling.

Dimension table storage

Supported connectors include JDBC, HBase, and Elasticsearch. Their capabilities and drawbacks are summarized below.

Name

Version

Source

Sink

JDBC

Bounded Scan, Lookup

Streaming Sink, Batch Sink

HBase

1.4.x & 2.2.x

Bounded Scan, Lookup

Streaming Sink, Batch Sink

Elasticsearch

6.x & 7.x

Not supported

Streaming Sink, Batch Sink

JDBC offers direct DB lookup without data sync but can stress the source under peak load. HBase provides high QPS via RowKey but only supports key‑based joins. Elasticsearch enables flexible queries and high QPS, but it is not officially supported as a dimension table and can be slower.

Connector development

A custom Flink‑Elasticsearch connector was built to support both source (scan, lookup) and sink (streaming, batch) operations.

Overall Real‑time Warehouse Architecture

The architecture stores raw streams in Kafka and OSS (Hudi tables). VVP provides unified development, operation, and resource management. Flink handles real‑time processing, Trino enables interactive analytics, Superset generates reports, and Hive Metastore maintains metadata. (Architecture diagram omitted.)

Data Development Cases

Data Ingestion

CDC streams MySQL binlog to Kafka as Debezium JSON.

Hudi table ingestion

Using Flink SQL to create and populate Hudi tables. The DDL and DML statements are shown below.

-- Hudi table DDL statement
CREATE TABLE IF NOT EXISTS `catalog`.`db_name`.`table_name` (
  id BIGINT,
  xxx STRING,
  `data_date` VARCHAR(20),
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.tasks' = '2',
  'index.global.enabled' = 'true',
  'index.bootstrap.enabled' = 'true',
  'read.streaming.enabled' = 'true',
  'oss.endpoint' = 'xxx',
  'accessKeyId' = 'xxx',
  'accessKeySecret' = 'xxx',
  'path' = 'oss://xxx',
  'hive_sync.enable' = 'true',
  'hive_sync.mode' = 'hms',
  'hive_sync.db' = 'hudi',
  'hive_sync.metastore.uris' = 'xxx',
  'hive_sync.table' = 'xxx',
  'compaction.tasks' = '1',
  'hoodie.cleaner.commits.retained' = '10',
  'hoodie.datasource.write.precombine.field' = 'updated_at',
  'hoodie.datasource.write.recordkey.field' = 'id'
);

-- DML statement
INSERT INTO `catalog`.`db_name`.`table_name`
SELECT id, xxx, `data_date`
FROM kafka_streaming_table
WHERE id IS NOT NULL;

Key points: enable bootstrap for initial uniqueness, define PRIMARY KEY for upserts, set pre‑combine field, configure Hive sync parameters, and retain a limited number of commit versions for cleanup.

Elasticsearch dimension table

The custom connector is packaged and registered on the Alibaba Cloud real‑time platform, then used to create and populate ES tables via Flink SQL.

-- ES table DDL
DROP TABLE IF EXISTS es_dimension01;
CREATE TABLE IF NOT EXISTS es_dimension01 (
  id STRING,
  xxx STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'elasticsearch6-lls',
  'format' = 'json',
  'hosts' = 'http://xxx:9200',
  'index' = 'es_dimension01',
  'username' = 'xxx',
  'password' = 'xxx'
);

-- DML: read from Kafka and write to Elasticsearch
INSERT INTO es_dimension01
SELECT id, xxx FROM kafka_xxx_dimension;

The ES connector supports upsert by mapping the PRIMARY KEY to the document ID, allowing updates and deletions through ID‑based operations.

Data Processing

A labeling job consumes Kafka streams, performs lookup joins with multiple ES dimension tables, uses an interval join for a fast‑changing dimension, and writes enriched records back to a Kafka sink. The full INSERT…SELECT statement is shown below.

INSERT INTO kafka_sink
SELECT
  CAST(a.id AS BIGINT) AS id,
  'lable0' AS lable,
  CASE WHEN f.id IS NOT NULL THEN f.start_sec * 1000 ELSE e.start_sec * 1000 END AS sourceTime,
  CASE WHEN f.id IS NOT NULL THEN f.end_sec * 1000 ELSE e.end_sec * 1000 END AS expiredAt,
  a.updated_at AS updated,
  'dmp' AS type
FROM kafka_stream_source a
JOIN es_llspay_order_items FOR SYSTEM_TIME AS OF a.proctime AS b ON a.id = b.id
JOIN es_dimension01 FOR SYSTEM_TIME AS OF a.proctime AS c ON b.id = c.id
JOIN es_dimension02 FOR SYSTEM_TIME AS OF a.proctime AS d ON a.id = d.id LEFT
JOIN es_dimension03 FOR SYSTEM_TIME AS OF a.proctime AS e ON e.id = d.id LEFT
JOIN kafka_stream_dimension01 f ON CAST(f.id AS STRING) = d.id
  AND a.proctime BETWEEN f.proctime - INTERVAL '1' SECOND AND f.proctime + INTERVAL '5' SECOND
WHERE c.name LIKE '%label1%' OR c.name LIKE '%label2%'
  AND (e.id IS NOT NULL OR f.id IS NOT NULL);

This job achieves second‑level latency while ensuring data accuracy through careful join ordering and fallback strategies.

Outlook

The current Lambda architecture combines real‑time Flink pipelines with offline Spark 2.4.5 jobs. After migrating to Spark 3.x, Liulishuo expects Hudi to continue serving both batch and streaming storage, further unifying its data platform.

FlinkSQLElasticsearchKafkaReal-time Data WarehouseHudi
Liulishuo Tech Team
Written by

Liulishuo Tech Team

Help everyone become a global citizen!

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.