How to Build a Real‑Time Data Warehouse with EasyData: A Step‑by‑Step Guide
Learn how to design and implement a real‑time data warehouse for an app’s AB‑test monitoring using EasyData, covering data flow layers, CDC task creation, stream table registration, Flink SQL processing, and BI reporting, with detailed steps, code snippets, and practical tips.
Background and Problem
Many data teams start real‑time business development with siloed code and single‑task pipelines, lacking proper management and layering of real‑time data warehouses. As real‑time scenarios grow, requirements such as data reuse and self‑service querying emerge, demanding systematic real‑time data governance similar to offline data.
Business Data Layering
The typical real‑time data flow includes ingestion, processing, and storage. In the AB‑test example, the pipeline consists of:
ODS layer: raw logs written to Kafka, retained for 7 days.
DWD layer: cleaned data written back to Kafka via Flink.
DWS layer: aggregated data stored in Kudu for downstream consumption.
Impala queries the DWS layer to generate reports.
Solution with EasyData
EasyData’s real‑time development module introduces stream tables , a metadata abstraction for message‑oriented sources such as Kafka or RocketMQ that lack a fixed schema. Users register a stream table once and can reference it directly in Flink jobs, eliminating repetitive DDL definitions and improving data lineage visibility.
Future enhancements will add lineage tracing, preview, and model‑driven table creation, further simplifying real‑time warehouse management.
Case Scenario: Real‑Time DAU Statistics
The demo aims to compute daily active users (DAU) for an app, broken down by device type (iPhone, Huawei, OPPO, other). The data pipeline mirrors the earlier description: CDC → ODS (Kafka) → Flink cleaning → MySQL storage → BI reporting.
Step‑by‑Step Implementation
3.1 Prepare Simulation Data
Create a MySQL source table DAU_DS for raw visit logs and a DWD result table DAU_FINAL for aggregated counts. Sample schemas are shown in the images below.
3.2 Create CDC Task
Define a CDC job that captures changes from the MySQL source and writes them to a Kafka topic. Configure the source table name ( DAU_DS) and choose either “latest offset” for incremental data or “full initialization” for historical back‑fill.
Set the target as a Kafka topic (auto‑created if absent) with canal-json serialization, then save and publish the task.
Start the task from the operations console.
3.3 Register ODS Stream Table
In EasyData’s “Stream Table” section, create a new stream table pointing to the CDC‑generated Kafka topic. Choose automatic field parsing; the platform will infer the schema from sample messages.
After verification, save the stream table.
3.4 Create Flink SQL Task
Select the FLINK‑1.14 engine and write SQL that aggregates daily DAU per device type. Reference the stream table using the [database].[table] notation.
Key modifications required in the script:
Replace the consumer group ID with your stream‑table name.
Replace the stream‑table name in the FROM clause.
Set the submitter field to your name.
-- 设置Kafka消费者组id,需要更改自己创建的流表名称, 配置方式:'{流表名称}.connections.group.id' = '{groupid-name}';
set '这里填流表名称.connections.group.id' = 'example001';
-- 设置读取消息队列的位置, 配置方式:'{流表名称}.scan.startup.mode' = '{earliest-offset / latest-offset}'
set '这里填流表名称.scan.startup.mode' = 'earliest-offset';
-- 设置流表主键,配置方式:'{流表名称}.primary.keys' = '{primary key name}'
set '这里填流表名称.primary.keys' = 'id';
--设置源端表表名
CREATE view v1 AS
SELECT SUBSTRING(visitTime, 0, 10) AS `date`, equipment
FROM 这里填流表库名称.这里填流表名称;
CREATE view v2 AS
SELECT `date`, equipment, COUNT(equipment) AS eq_dau
FROM v1
GROUP BY `date`, equipment;
--将你的名字填入 submitter 字段
CREATE view v3 AS
SELECT '这里填你的名字' AS submitter,
`date`,
SUM(CASE equipment WHEN 'iPhone' THEN eq_dau ELSE 0 END) AS iPhoneDAU,
SUM(CASE equipment WHEN 'Huawei' THEN eq_dau ELSE 0 END) AS HuaweiDAU,
SUM(CASE equipment WHEN 'OPPO' THEN eq_dau ELSE 0 END) AS oppoDAU,
SUM(CASE equipment WHEN 'other' THEN eq_dau ELSE 0 END) AS otherDAU
FROM v2
GROUP BY `date`;
CREATE VIEW v4 AS
SELECT (v3.iPhoneDAU + v3.HuaweiDAU + v3.oppoDAU + v3.otherDAU) AS totalDAU,
submitter,
`date`,
iPhoneDAU,
HuaweiDAU,
oppoDAU,
otherDAU
FROM v3;
set 'DAU_FINAL.primary.keys' = 'submitter,date';
INSERT INTO MySQL数据源标识.MySQL数据库名称.DAU_FINAL
SELECT * FROM v4;Save, publish, and start the SQL job using the same workflow as the CDC task.
3.5 Build BI Report
In the “YouHaveData” BI tool, create a dashboard that queries DAU_FINAL. Refresh the report to see real‑time updates, visualized as a line chart and a tabular view.
This end‑to‑end example demonstrates how EasyData stream tables simplify real‑time data reuse, reduce repetitive DDL work, and enable rapid construction of a layered real‑time data warehouse for operational analytics.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
ITPUB
Official ITPUB account sharing technical insights, community news, and exciting events.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
