Big Data 29 min read

How Paimon Transforms Membership Data Warehousing: From Legacy Lambda to Real‑Time Lakehouse

This article examines the challenges of a legacy Lambda‑based membership data warehouse, introduces Apache Paimon’s lakehouse architecture and its key features, and showcases three real‑world implementations—partial‑update order wide tables, Bitmap‑based UV counting, and branch‑based data correction—while discussing benefits, remaining challenges, and future directions.

360 Zhihui Cloud Developer
360 Zhihui Cloud Developer
360 Zhihui Cloud Developer
How Paimon Transforms Membership Data Warehousing: From Legacy Lambda to Real‑Time Lakehouse

1. Original Membership Data Warehouse Architecture, Background, and Pain Points

1.1 Background

Member middle‑platform supports subscription, ordering, refunds, and profiling for all internet products. Rapid growth from tens of millions to hundreds of millions of members increased data volume and complexity, demanding higher real‑time, accuracy, and flexibility, prompting a joint effort with Qilin Cloud to build a Paimon stream‑batch unified warehouse.

1.2 Original Data Architecture

1.1 Pain Points

Too many components, complex pipelines : Traditional Lambda uses DataX, StarRocks, Canal, Zookeeper, Kafka, Flink, Spark, HDFS, Hive, etc., increasing system complexity, failure points, operational burden, learning curve, and cost, especially for real‑time debugging.

Data redundancy, low reusability : Separate real‑time and batch engines prevent data sharing; duplicate storage and duplicated business logic raise storage and compute costs.

High offline latency, expensive real‑time : Spark‑based ETL runs in T‑1 mode, causing delayed data; real‑time storage on SSD is costly, limiting long‑term trend analysis.

High development cost : Real‑time black‑box nature forces duplicate development for batch verification.

Data inconsistency : Maintaining parallel logic leads to synchronization errors and unreliable data.

Paimon Features and Data Lake Architecture

Apache Paimon is a streaming data lake platform that combines real‑time compute with lakehouse benefits.

2.1 Paimon Data Lake Architecture

The new architecture consists of:

Data Source Layer : Unchanged, includes business DBs, logs, etc.

Ingestion Layer : Flink CDC captures DB changes; Kafka receives logs.

Storage Layer : Paimon stores raw data, cleaned detail data, and aggregated metrics.

Compute Layer : Flink handles real‑time, Spark handles batch.

Service Layer : Trino or StarRocks query Paimon via PaimonCatalog.

2.2 Architecture Advantages

3. Practical Cases

3.1 Case 1: Partial‑Update Real‑Time Order Wide Table

3.1.1 Business Scenario

Need to merge order, product, device, coupon, refund data into a wide table for simplified analysis, avoiding high‑latency joins.

3.1.2 Implementation

Upstream Data Sources

Multiple source tables are written to Paimon ODS via Flink CDC with deduplicate merge engine.

-- 基础订单信息表
CREATE TABLE ods.order_info (
    order_id string COMMENT '订单ID',
    product string COMMENT '所属产品名称',
    qid string COMMENT '用户id',
    order_create_time timestamp(3) COMMENT '订单创建时间',
    order_end_time timestamp(3) COMMENT '订单结束/时间',
    order_status int COMMENT '订单状态',
    order_real_fee decimal(10,2) COMMENT '订单实付金额',
    sku_id int COMMENT '商品id',
    ...
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector'='paimon',
    'merge-engine'='deduplicate',
    ...
);
-- 订单设备信息表
CREATE TABLE ods.order_device_info (
    order_id string COMMENT '订单ID',
    mid string COMMENT '设备id',
    client_type string COMMENT '客户端类型',
    android_id string COMMENT 'android设备id',
    idfa string COMMENT 'idfa',
    client_version string COMMENT '客户端版本',
    os_name string COMMENT '操作系统名称',
    ip string COMMENT 'ip',
    ...
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector'='paimon',
    'merge-engine'='deduplicate',
    ...
);
-- 订单优惠券信息表
CREATE TABLE ods.order_coupon_info (
    order_id string COMMENT '订单ID',
    coupon_code string COMMENT '优惠券唯一码',
    qid string COMMENT '领取用户id',
    coupon_batch string COMMENT '优惠券批次码',
    ctime timestamp(3) COMMENT '优惠券领取时间',
    coupon_type int COMMENT '优惠券类型',
    coupon_status int COMMENT '优惠券状态',
    coupon_discount string COMMENT '优惠券折扣/减免值',
    ...
    PRIMARY KEY (coupon_code) NOT ENFORCED
) WITH (
    'connector'='paimon',
    'merge-engine'='deduplicate',
    ...
);
-- 订单退款信息表
CREATE TABLE ods.order_refund_info (
    order_id string COMMENT '订单ID',
    refund_mer_trade_code string COMMENT '退款流水号',
    qid string COMMENT '用户id',
    refund_type int COMMENT '退款类型',
    refund_status int COMMENT '退款状态',
    refund_fee decimal(10,2) COMMENT '退款金额',
    refund_time timestamp(3) COMMENT '退款时间',
    refund_way int COMMENT '退款支付方式',
    ...
    PRIMARY KEY (refund_mer_trade_code) NOT ENFORCED
) WITH (
    'connector'='paimon',
    'merge-engine'='deduplicate',
    ...
);

Order Wide Table Definition

-- 订单宽表
CREATE TABLE dwd.enrich_order (
    order_id string COMMENT '订单ID',
    -- 订单基础信息
    product string COMMENT '所属产品名称',
    qid string COMMENT '用户id',
    order_create_time timestamp(3) COMMENT '订单创建时间',
    order_end_time timestamp(3) COMMENT '订单结束/时间',
    order_status int COMMENT '订单状态',
    order_real_fee decimal(10,2) COMMENT '订单实付金额',
    sku_id int COMMENT '商品id',
    -- 订单设备信息
    mid string COMMENT '设备id',
    client_type string COMMENT '客户端类型',
    android_id string COMMENT 'android设备id',
    idfa string COMMENT 'idfa',
    client_version string COMMENT '客户端版本',
    os_name string COMMENT '操作系统名称',
    ip string COMMENT 'ip',
    -- 订单优惠券信息
    coupon_code string COMMENT '优惠券唯一码',
    coupon_batch string COMMENT '优惠券批次码',
    coupon_type int COMMENT '优惠券类型',
    coupon_status int COMMENT '优惠券状态',
    coupon_discount string COMMENT '优惠券折扣/减免值',
    -- 订单退款信息
    g_refund_seq int COMMENT '退款字段sequence',
    refund_info array<row<refund_mer_trade_code string, refund_type int, refund_status int, refund_fee decimal(10,2), refund_time timestamp(3), refund_way int>> COMMENT '退款信息',
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector'='paimon',
    'merge-engine'='partial-update',
    'write-only'='true',
    'changelog-producer'='lookup',
    'fields.g_refund_seq.sequence-group'='refund_info',
    'fields.refund_info.aggregate-function'='nested_update',
    'fields.refund_info.nested-key'='refund_mer_trade_code'
);

Key Configuration

partial‑update merge engine : Updates only specific columns without overwriting whole rows.

nested_update aggregation : Merges refund arrays by refund_mer_trade_code.

changelog‑producer=lookup : Enables downstream streaming reads of changes.

write‑only mode : Improves write performance and handles small‑file compaction.

FlinkSQL Multi‑Stream Write

create catalog paimon with (
    'type'='paimon',
    'metastore'='hive',
    'uri'='thrift://xxxxx',
    'warehouse'='hdfs://xxx/paimon/warehouse'
);
use catalog paimon;
execute statement set begin;
-- Insert order base fields
insert into `dwd`.`enrich_order` (order_id, product, order_create_time, order_end_time, order_status, order_real_fee, sku_id)
select order_id, product, order_create_time, order_end_time, order_status, order_real_fee, sku_id from `ods`.`order_info` /*+ OPTIONS('scan.parallelism'='20') */;
-- Insert device fields
insert into `dwd`.`enrich_order` (order_id, mid, client_type, android_id, idfa, client_version, os_name, ip)
select order_id, mid, client_type, android_id, idfa, client_version, os_name, ip from `ods`.`order_device_info` /*+ OPTIONS('scan.parallelism'='20') */;
-- Insert coupon fields
insert into `dwd`.`enrich_order` (order_id, coupon_code, coupon_batch, coupon_type, coupon_status, coupon_discount)
select order_id, coupon_code, coupon_batch, coupon_type, coupon_status, coupon_discount from `ods`.`order_coupon_info` /*+ OPTIONS('scan.parallelism'='10') */;
-- Insert refund fields
insert into `dwd`.`enrich_order` (order_id, g_refund_seq, refund_info)
select order_id, 1 as g_refund_seq,
       array[row(refund_mer_trade_code, refund_type, refund_status, refund_fee, refund_time, refund_way)] as refund_info
from `ods`.`order_refund_info` /*+ OPTIONS('scan.parallelism'='3') */;
end;

The SQL is straightforward, each stream writes its own fields without joins, making the code easy to understand and maintain.

3.1.3 Technical Comparison

Paimon Partial‑Update Advantages

True column‑level updates : SequenceGroup resolves out‑of‑order issues.

Low‑latency updates : No need to wait for all streams, no state management.

Multi‑stream support : Simultaneous updates with automatic conflict handling.

Aggregation functions : Supports sum, max, nested_update during updates.

Change‑log integration : Generates changelog for downstream streaming reads.

Data back‑trace : Immediate rollback without affecting other streams.

Traditional Flink Dual‑Stream Join Pain Points

High data latency due to waiting for both streams.

Complex state management leading to memory pressure.

Severe data skew on hot keys.

Expensive fault‑tolerance with snapshot overhead.

High development complexity (windows, state expiration).

High storage cost for duplicated streams and state.

3.2 Case 2: Bitmap for Precise UV Counting

3.2.1 Business Scenario

Need to count unique users (UV) across multiple dimensions (product, member level, hour, day) for cashier exposure.

3.2.2 Traditional Approach

Cube or grouping‑sets cause exponential computation growth.

Query requires specifying every dimension value.

Lacks flexibility for arbitrary dimension combinations.

3.2.3 Bitmap Solution

Bitmap type provides exact deduplication like SUM.

Bit‑level representation saves storage (≈1/32 of COUNT(DISTINCT)).

Bitwise operations make it faster than COUNT(DISTINCT).

Non‑numeric types need a dictionary mapping, adding some development effort.

3.2.4 Implementation

Dictionary Table

CREATE TABLE `dict`.`mid_mapping` (
    mid STRING NOT NULL COMMENT '设备mid',
    mid_int BIGINT COMMENT '设备mid数字类型映射',
    PRIMARY KEY (mid) NOT ENFORCED
) COMMENT 'mid字典表' WITH (
    'merge-engine'='first-row',
    'ignore-delete'='true',
    ...
);

Flink UDF for Snowflake ID

package cn.qihoo360.member.udf.flink;
import com.github.yitter.contract.IdGeneratorOptions;
import com.github.yitter.idgen.YitIdHelper;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;
import java.lang.reflect.Field;
import java.util.Random;
public class AutoIDGeneratorUDF extends ScalarFunction {
    @Override
    public void open(FunctionContext fc) throws Exception {
        super.open(fc);
        short workerId = extractTaskIndex(fc);
        IdGeneratorOptions options = new IdGeneratorOptions(workerId);
        options.WorkerIdBitLength = 15;
        options.SeqBitLength = 7;
        YitIdHelper.setIdGenerator(options);
    }
    @DataTypeHint("BIGINT")
    public long eval() { return YitIdHelper.nextId(); }
    @Override
    public boolean isDeterministic() { return false; }
    private short extractTaskIndex(FunctionContext fc) throws Exception {
        Field field = fc.getClass().getDeclaredField("context");
        field.setAccessible(true);
        Object context = field.get(fc);
        if (context == null) {
            System.out.println("AutoIDGeneratorUDF: context is null, use random int");
            return (short) new Random().nextInt(Short.MAX_VALUE);
        }
        RuntimeContext runtimeContext = (RuntimeContext) context;
        return (short) runtimeContext.getTaskInfo().getIndexOfThisSubtask();
    }
}

FlinkSQL Write Dictionary

create temporary function auto_id as 'cn.qihoo360.member.udf.flink.AutoIDGeneratorUDF';
insert into `dict`.`mid_mapping`
select mid, auto_id() as mid_int from `dwd`.`cashier_imp_event` /*+ OPTIONS('scan.parallelism'='10') */;

Downstream Aggregation Table

CREATE TABLE `dws`.`cashier_imp_hi` (
    dt string COMMENT '日期',
    hh string COMMENT '小时',
    product string COMMENT '产品名称',
    pv int COMMENT '收银台曝光PV',
    uv_bitmap varbinary COMMENT '收银台曝光UV(Bitmap)',
    PRIMARY KEY (dt, hh, product) NOT ENFORCED
) COMMENT '收银台曝光流量统计'
PARTITIONED BY (dt, hh)
WITH (
    'merge-engine'='aggregation',
    'fields.pv.aggregate-function'='sum',
    'fields.uv_bitmap.aggregate-function'='last_non_null_value'
);

SparkSQL Aggregation

set spark.sql.sources.partitionOverwriteMode=dynamic;
insert overwrite table `dws`.`cashier_imp_hi` partition(dt)
select dt,
       hh,
       product,
       sum(1) as pv,
       bitmap_agg(t2.mid_int) as uv_bitmap
from `dwd`.`cashier_imp_event` t1
left join `dict`.`mid_mapping` t2 on t1.mid = t2.mid
where coalesce(t1.mid, '') != ''
group by 1,2,3;

StarRocks OLAP Query

select dt, hh, product,
       sum(pv) as pv,
       bitmap_union_count(bitmap_from_binary(uv_bitmap)) as uv
from paimon.dws.cashier_imp_hi
group by dt, hh, product;

3.3 Case 3: Branch Feature for Data Correction

3.3.1 Business Scenario

Real‑time streams may produce inaccurate data due to out‑of‑order or dimension updates; traditionally a separate batch job is needed for correction.

3.3.2 Paimon Branch Advantages

3.3.3 Solution Steps

Step 1: Create streaming branch as fallback

-- Create streaming branch
CALL sys.create_branch('dwd.enrich_refund', 'streaming');
-- Set fallback branch
ALTER TABLE `dwd.enrich_refund` SET (
    'scan.fallback-branch'='streaming'
);

Step 2: Real‑time Flink writes to streaming branch

insert into `dwd`.`enrich_refund$branch_streaming`
select refund_mer_trade_code,
       date_format(refund_time, 'yyyy-MM-dd') as dt,
       order_id as refund_order_id,
       refund_time,
       qid,
       refund_status,
       ...
from dwd.enrich_order /*+ OPTIONS('scan.parallelism'='40') */
cross join unnest(refund_info) as t(refund_mer_trade_code, refund_time, refund_status, ...)
where refund_time >= timestamp '${dt}';

Step 3: Batch Flink writes to main branch for incremental correction

set 'execution.runtime-mode'='batch';
insert into `dwd`.`enrich_refund`
select refund_mer_trade_code,
       date_format(refund_time, 'yyyy-MM-dd') as dt,
       order_id as refund_order_id,
       refund_time,
       qid,
       refund_status,
       ...
from dwd.enrich_order /*+ OPTIONS('scan.parallelism'='40') */
cross join unnest(refund_info) as t(refund_mer_trade_code, refund_time, refund_status, ...)
where dt < '${dt}';

Step 4: Verify via StarRocks or Trino

select dt, count(1) from paimon.dwd.enrich_refund
where dt >= date_sub(current_date, interval 3 day)
group by dt order by dt;

4. Benefits and Challenges

4.1 Transaction Domain Warehouse Upgrade

Cost reduction up to 48% and development efficiency improvement of 30%‑50% .

4.2 Traffic Domain Warehouse Upgrade

Estimated cost reduction of 30% .

4.3 Current Challenges of Paimon

Ecosystem maturity : Fewer integration tools compared to Hive or Spark.

Learning curve : Teams need deeper knowledge of Flink, Spark, and storage.

Sub‑second latency : Real‑time limited by Flink checkpoint intervals.

LookupJoin limitations : Large or frequently updated tables still rely on external KV stores like HBase or Redis.

Documentation and community : Documentation is sparse and community response can be slow.

5. Future Outlook

5.1 Technical Optimization Directions

Performance tuning : Refine parameters to boost read/write/query efficiency.

Ecosystem enrichment : Deepen integration with Spark, Flink, StarRocks.

Wider adoption : Scale solutions across more business scenarios and codify best practices.

5.2 Team Capability Building

Technical deepening : Systematic study of Flink, Spark, and Paimon internals.

Knowledge consolidation : Build comprehensive documentation and multi‑scenario case libraries.

Team growth : Elevate overall technical competence to support stream‑batch unified processing.

Through deep integration of Paimon, the membership middle‑platform warehouse has achieved a leap from offline batch to real‑time streaming, establishing a "stream‑batch unified, compute‑storage fusion" architecture that delivers minute‑level response for member profiling and real‑time marketing, while still offering room for optimization in large‑scale concurrency.

Big DataFlinkStreamingData WarehousePaimondata lake
360 Zhihui Cloud Developer
Written by

360 Zhihui Cloud Developer

360 Zhihui Cloud is an enterprise open service platform that aims to "aggregate data value and empower an intelligent future," leveraging 360's extensive product and technology resources to deliver platform services to customers.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.