Streaming Data Lake Warehouse Solution Based on USDP with Flink and Paimon
This article presents UCloud's USDP‑based streaming data lake warehouse solution that leverages Flink for real‑time processing and Paimon for lake storage, detailing its architecture, advantages, practical scenarios, and providing complete SQL and Flink CDC code snippets for end‑to‑end implementation.
Background
In the wave of digital transformation, enterprises increasingly demand real‑time data, but traditional offline data warehouses suffer from batch‑oriented updates that cause latency, high storage and compute costs, and limited business agility.
Solution Overview
UCloud proposes a streaming data lake warehouse built on the USDP (UCloud Smart Data Platform) framework. The solution uses Flink as the real‑time processing engine and Paimon as the lake storage format, achieving a unified stream‑batch data management capability that extends the real‑time processing limits of traditional warehouses.
Data Ingestion
Flink MySQL CDC connectors capture MySQL binary log (binlog) changes with minimal intrusion, supporting incremental snapshot reads. This enables low‑latency detection of DML events and real‑time synchronization of database changes.
-- 创建订单库
CREATE DATABASE IF NOT EXISTS order_db;
USE order_db;
-- 订单表
CREATE TABLE `orders` (
order_id bigint primary key,
user_id bigint,
shop_id bigint,
product_id bigint,
buy_fee bigint,
create_time TIMESTAMP(3),
update_time TIMESTAMP(3),
state int
);
-- 订单支付表
CREATE TABLE `orders_pay` (
pay_id bigint primary key,
order_id bigint,
pay_platform int,
create_time TIMESTAMP(3)
);
-- 商品类别表
CREATE TABLE `product_catalog` (
product_id bigint primary key,
catalog_name varchar(50),
create_time TIMESTAMP(3)
);Data Generation with Flink DataGen
-- 商品
CREATE TABLE product_catalog (
product_id bigint PRIMARY KEY,
catalog_name varchar(50),
create_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'fields.product_id.kind'='sequence',
'fields.product_id.start'='202403000001',
'fields.product_id.end'='202403002000',
'fields.catalog_name.kind'='random',
'fields.catalog_name.length'='6'
);
-- 将商品类别数据写入 MySQL
CREATE TABLE product_catalog_sink (
product_id bigint PRIMARY KEY NOT ENFORCED,
catalog_name varchar(50),
create_time TIMESTAMP(3)
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://usdp.ucloudstack.com:3306/order_db',
'table-name' = 'product_catalog',
'username' = 'username',
'password' = 'password'
);
INSERT INTO product_catalog_sink SELECT * FROM product_catalog;
-- 订单
CREATE TABLE orders (
order_id bigint primary key,
user_id bigint,
shop_id bigint,
product_id bigint,
buy_fee bigint,
create_time TIMESTAMP(3),
update_time TIMESTAMP(3),
state int
) WITH (
'connector' = 'datagen',
'rows-per-second'='10',
'fields.order_id.kind'='sequence',
'fields.order_id.start'='10001',
'fields.order_id.end'='30000',
'fields.user_id.kind'='random',
'fields.user_id.min'='202401000001',
'fields.user_id.max'='202401002000',
'fields.shop_id.kind'='random',
'fields.shop_id.min'='202402000001',
'fields.shop_id.max'='202402002000',
'fields.product_id.kind'='random',
'fields.product_id.min'='202403000001',
'fields.product_id.max'='202403002000',
'fields.buy_fee.kind'='random',
'fields.buy_fee.min'='1',
'fields.buy_fee.max'='10000',
'fields.state.kind'='random',
'fields.state.min'='0',
'fields.state.max'='1'
);
-- 支付
CREATE TABLE orders_pay (
pay_id bigint primary key,
order_id bigint,
pay_platform int,
create_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'rows-per-second'='10',
'fields.pay_id.kind'='sequence',
'fields.pay_id.start'='2010001',
'fields.pay_id.end'='2030000',
'fields.order_id.kind'='sequence',
'fields.order_id.start'='10001',
'fields.order_id.end'='30000',
'fields.pay_platform.kind'='random',
'fields.pay_platform.min'='1',
'fields.pay_platform.max'='9'
);
-- 将订单数据写入 MySQL
CREATE TABLE orders_sink (
order_id bigint primary key,
user_id bigint,
shop_id bigint,
product_id bigint,
buy_fee bigint,
create_time TIMESTAMP(3),
update_time TIMESTAMP(3),
state int
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://usdp.ucloudstack.com:3306/order_db',
'table-name' = 'orders',
'username' = 'username',
'password' = 'password'
);
INSERT INTO orders_sink SELECT * FROM orders;
-- 将支付数据写入 MySQL
CREATE TABLE orders_pay_sink (
pay_id bigint primary key,
order_id bigint,
pay_platform int,
create_time TIMESTAMP(3)
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://usdp.ucloudstack.com:3306/order_db',
'table-name' = 'orders_pay',
'username' = 'username',
'password' = 'password'
);
INSERT INTO orders_pay_sink SELECT * FROM orders_pay;ODS Layer Construction
Flink synchronizes the MySQL tables (orders, orders_pay, product_catalog) to HDFS in real time, storing them in Paimon format to form the Operational Data Store (ODS) layer.
mysql_sync_database
--warehouse hdfs://cluster-c20789/cluster-c20789/user/hive/warehouse
--database order_db
--mysql_conf hostname=usdp.ucloudstack.com
--mysql_conf username=username
--mysql_conf password=password
--mysql_conf database-name=order_db
--table_conf bucket=-1
--table_conf changelog-producer=input
--table_conf sink.parallelism=4DWD Layer (Wide Table) Construction
The DWD layer merges orders, product_catalog, and orders_pay into a wide table using Paimon's partial‑update mechanism.
CREATE CATALOG paimon_catalog WITH (
'type'='paimon',
'warehouse'='hdfs://cluster-c20789/cluster-c20789/user/hive/warehouse'
);
USE CATALOG paimon_catalog;
USE order_db;
CREATE TABLE IF NOT EXISTS dwd_orders (
order_id BIGINT,
order_user_id BIGINT,
order_shop_id BIGINT,
order_product_id BIGINT,
order_product_catalog_name STRING,
order_fee BIGINT,
order_create_time TIMESTAMP(3),
order_update_time TIMESTAMP(3),
order_state INT,
pay_id BIGINT,
pay_platform INT,
pay_create_time TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'merge-engine' = 'partial-update',
'partial-update.ignore-delete' = 'true',
'changelog-producer' = 'lookup'
);
SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
SET 'table.exec.sink.upsert-materialize' = 'NONE';
SET 'execution.checkpointing.interval' = '10s';
SET 'execution.checkpointing.min-pause' = '10s';
INSERT INTO dwd_orders
SELECT
o.order_id,
o.user_id,
o.shop_id,
o.product_id,
dim.catalog_name,
o.buy_fee,
o.create_time,
o.update_time,
o.state,
CAST(NULL AS BIGINT) AS pay_id,
CAST(NULL AS INT) AS pay_platform,
CAST(NULL AS TIMESTAMP(3)) AS pay_create_time
FROM orders o LEFT JOIN product_catalog dim ON o.product_id = dim.product_id
UNION ALL
SELECT
order_id,
CAST(NULL AS BIGINT) AS user_id,
CAST(NULL AS BIGINT) AS shop_id,
CAST(NULL AS BIGINT) AS product_id,
CAST(NULL AS STRING) AS order_product_catalog_name,
CAST(NULL AS BIGINT) AS order_fee,
CAST(NULL AS TIMESTAMP(3)) AS order_create_time,
CAST(NULL AS TIMESTAMP(3)) AS order_update_time,
CAST(NULL AS INT) AS order_state,
pay_id,
pay_platform,
create_time
FROM orders_pay;DWM Layer Construction
A DWM layer aggregates user‑shop metrics using Paimon's aggregation engine.
CREATE TABLE IF NOT EXISTS dwm_users_shops (
user_id BIGINT,
shop_id BIGINT,
ds STRING,
payed_buy_fee_sum BIGINT COMMENT '当日用户在商户完成支付的总金额',
pv BIGINT COMMENT '当日用户在商户购买的次数',
PRIMARY KEY (user_id, shop_id, ds) NOT ENFORCED
) WITH (
'merge-engine' = 'aggregation',
'fields.payed_buy_fee_sum.aggregate-function' = 'sum',
'fields.pv.aggregate-function' = 'sum',
'changelog-producer' = 'lookup',
'file.format' = 'avro',
'metadata.stats-mode' = 'none'
);
SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
SET 'table.exec.sink.upsert-materialize' = 'NONE';
SET 'execution.checkpointing.interval' = '10s';
SET 'execution.checkpointing.min-pause' = '10s';
INSERT INTO dwm_users_shops
SELECT
order_user_id,
order_shop_id,
DATE_FORMAT(pay_create_time, 'yyyyMMdd') AS ds,
order_fee,
1
FROM dwd_orders
WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL;DWS Layer Construction
The DWS layer materializes user‑level and shop‑level aggregates for downstream analytics.
CREATE TABLE IF NOT EXISTS dws_users (
user_id BIGINT,
ds STRING,
payed_buy_fee_sum BIGINT COMMENT '当日完成支付的总金额',
PRIMARY KEY (user_id, ds) NOT ENFORCED
) WITH (
'merge-engine' = 'aggregation',
'fields.payed_buy_fee_sum.aggregate-function' = 'sum'
);
CREATE TABLE IF NOT EXISTS dws_shops (
shop_id BIGINT,
ds STRING,
payed_buy_fee_sum BIGINT COMMENT '当日完成支付总金额',
uv BIGINT COMMENT '当日不同购买用户总人数',
pv BIGINT COMMENT '当日购买用户总人次',
PRIMARY KEY (shop_id, ds) NOT ENFORCED
) WITH (
'merge-engine' = 'aggregation',
'fields.payed_buy_fee_sum.aggregate-function' = 'sum',
'fields.uv.aggregate-function' = 'sum',
'fields.pv.aggregate-function' = 'sum'
);
SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
SET 'table.exec.sink.upsert-materialize' = 'NONE';
SET 'execution.checkpointing.interval' = '10s';
SET 'execution.checkpointing.min-pause' = '10s';
BEGIN STATEMENT SET;
INSERT INTO dws_users SELECT user_id, ds, payed_buy_fee_sum FROM dwm_users_shops;
INSERT INTO dws_shops SELECT shop_id, ds, payed_buy_fee_sum, 1, pv FROM dwm_users_shops;
END;Advantages
Real‑time low‑latency queries (seconds instead of hours).
Efficient data updates with Paimon's LSM‑tree and upsert support.
Simplified processing pipelines using Flink SQL.
Flexible merge strategies via Paimon's merge‑engine.
Incremental data generation and fast data lake storage.
Broad engine compatibility (Flink, Spark, StarRocks, Doris, Trino).
Practical Scenarios
The solution is demonstrated with an e‑commerce platform that captures MySQL order streams via Flink CDC, processes them in real time, and provides dashboards for user payment, shop sales, ranking, and product reports.
Key queries include real‑time user payment view, shop sales details, top‑shop ranking, order detail drill‑down, and product category sales aggregation, all executed via Flink SQL or compatible query engines.
Conclusion
The USDP streaming data lake warehouse enables near‑real‑time data ingestion, processing, and analytics, reducing latency and cost while improving decision‑making for e‑commerce businesses.
DataFunTalk
Dedicated to sharing and discussing big data and AI technology applications, aiming to empower a million data scientists. Regularly hosts live tech talks and curates articles on big data, recommendation/search algorithms, advertising algorithms, NLP, intelligent risk control, autonomous driving, and machine learning/deep learning.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.