Big Data 24 min read

Designing a Real-Time Data System with Flink: Architecture, Data Modeling, and UV Metric Computation

This article outlines a comprehensive real‑time data system built on Apache Flink, covering its application scenarios, layered architecture, data model stratification, construction methods, and a concrete Flink SQL example for calculating UV metrics from Kafka‑sourced page‑view data.

DataFunTalk
DataFunTalk
DataFunTalk
Designing a Real-Time Data System with Flink: Architecture, Data Modeling, and UV Metric Computation

Introduction

The rapid growth of internet services and the emergence of 5G have heightened the need for timely data processing in industries such as industrial IoT and e‑commerce. This article presents a complete solution for building a real‑time data system, divided into three parts: Flink application scenarios, overall architecture and data model, and a concrete Flink‑SQL UV calculation case.

1. Flink Real‑Time Application Scenarios

Flink is widely used for four main real‑time workloads: data synchronization, streaming ETL, real‑time analytics, and complex event processing. A diagram (omitted) illustrates typical business cases for each scenario.

2. Real‑Time Data System Architecture

The system consists of five layers: access, storage, compute, platform, and application.

Access Layer: Collects logs (binlog, click‑stream, service logs) via tools and forwards them to Kafka, serving both real‑time and offline pipelines.

Storage Layer: Persists raw and cleaned data in Kafka, HDFS, Kudu, ClickHouse, HBase, Redis, MySQL, etc., according to the unified data‑model stratification.

Compute Layer: Uses Flink, Spark, Presto, and ClickHouse native compute. Flink handles data sync, streaming ETL, and second‑level metrics; Spark SQL serves near‑real‑time multi‑dimensional analytics; Presto/ClickHouse support ad‑hoc queries.

Platform Layer: Provides unified query services, metadata & metric management, and data quality & lineage monitoring.

Application Layer: Supports real‑time dashboards, data products, OLAP, and feature services.

Platform Layer Details

Unified query service enables SQL‑style access to KV stores such as Redis and HBase.

Metadata & metric management standardizes table naming, field semantics, and centralizes metric definitions.

Data quality & lineage consists of platform monitoring (task status, Kafka lag) and data monitoring (ETL anomalies, lambda‑vs‑kappa metric validation).

2.2 Real‑Time Data Model Layering

The model is split into four layers for real‑time warehouses (ODS, DWD, DIM, DM), each with distinct storage and processing characteristics.

ODS Layer: Stores raw, lightly cleaned data from binlog, click‑stream, and application logs. Data is ingested into Kafka and optionally persisted to HDFS/Kudu for downstream use.

DWD Layer: Provides detailed, fact‑level tables after simple streaming ETL, stored in Kafka and written to 5‑minute Hive tables for both real‑time and offline consumption.

DIM Layer: Holds shared dimension tables using MySQL, HBase, or Redis depending on size, update frequency, and query QPS requirements.

DM Layer: Offers two aggregation levels: a lightweight summary layer (minute‑level latency) for complex OLAP queries, and a high‑level summary layer (second‑level latency) for simple KV lookups such as real‑time dashboards.

2.3 Construction Methods

Two approaches are compared: pure real‑time (streaming engine, second‑level latency) and near‑real‑time (ETL + OLAP, minute‑level latency). Real‑time has no scheduling overhead but lower tolerance for late data; near‑real‑time offers higher flexibility and better handling of late arrivals.

2.4 Evolution of Stream‑Batch Unified Architecture

The article traces the progression from traditional offline warehouses to Lambda, Kappa, and finally stream‑batch unified architectures, emphasizing the goal of achieving low‑latency, SQL‑friendly analytics with a single engine.

3. Flink SQL Real‑Time UV Metric Calculation

A practical case demonstrates how to compute unique visitors (UV) from page‑view events stored in Kafka.

3.1 Kafka Source Deserialization

public class PageViewDeserializationSchema implements DeserializationSchema
{
    public static final Logger LOG = LoggerFactory.getLogger(PageViewDeserializationSchema.class);
    protected SimpleDateFormat dayFormatter;
    private final RowTypeInfo rowTypeInfo;

    public PageViewDeserializationSchema(RowTypeInfo rowTypeInfo) {
        dayFormatter = new SimpleDateFormat("yyyyMMdd", Locale.UK);
        this.rowTypeInfo = rowTypeInfo;
    }

    @Override
    public Row deserialize(byte[] message) throws IOException {
        Row row = new Row(rowTypeInfo.getArity());
        MobilePage mobilePage = null;
        try {
            mobilePage = MobilePage.parseFrom(message);
            String mid = mobilePage.getMid();
            row.setField(0, mid);
            Long timeLocal = mobilePage.getTimeLocal();
            String logDate = dayFormatter.format(timeLocal);
            row.setField(1, logDate);
            row.setField(2, timeLocal);
        } catch (Exception e) {
            String mobilePageError = (mobilePage != null) ? mobilePage.toString() : "";
            LOG.error("error parse bytes payload is {}, pageview error is {}", message.toString(), mobilePageError, e);
        }
        return null;
    }
}

3.2 Flink Job Main Program

public class RealtimeUV {
    public static void main(String[] args) throws Exception {
        // step1: load Kafka, HBase, checkpoint configs
        Map
config = PropertiesUtil.loadConfFromFile(args[0]);
        String topic = config.get("source.kafka.topic");
        String groupId = config.get("source.group.id");
        String sourceBootStrapServers = config.get("source.bootstrap.servers");
        String hbaseTable = config.get("hbase.table.name");
        String hbaseZkQuorum = config.get("hbase.zk.quorum");
        String hbaseZkParent = config.get("hbase.zk.parent");
        int checkPointPeriod = Integer.parseInt(config.get("checkpoint.period"));
        int checkPointTimeout = Integer.parseInt(config.get("checkpoint.timeout"));

        StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        // step2: checkpoint settings
        sEnv.getConfig().registerTypeWithKryoSerializer(MobilePage.class, ProtobufSerializer.class);
        sEnv.getCheckpointConfig().setFailOnCheckpointingErrors(false);
        sEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        sEnv.enableCheckpointing(checkPointPeriod, CheckpointingMode.EXACTLY_ONCE);
        sEnv.getCheckpointConfig().setCheckpointTimeout(checkPointTimeout);
        sEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        // step3: Blink planner & TableEnvironment
        EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv, environmentSettings);
        tEnv.getConfig().setIdleStateRetentionTime(Time.days(1), Time.days(2));

        Properties sourceProperties = new Properties();
        sourceProperties.setProperty("bootstrap.servers", sourceBootStrapServers);
        sourceProperties.setProperty("auto.commit.interval.ms", "3000");
        sourceProperties.setProperty("group.id", groupId);

        // step4: register Kafka table source
        TableSchema schema = TableSchemaUtil.getAppPageViewTableSchema();
        Optional
proctimeAttribute = Optional.empty();
        List
rowtimeAttributeDescriptors = Collections.emptyList();
        Map
fieldMapping = new HashMap<>();
        List
columnNames = new ArrayList<>();
        RowTypeInfo rowTypeInfo = new RowTypeInfo(schema.getFieldTypes(), schema.getFieldNames());
        columnNames.addAll(Arrays.asList(schema.getFieldNames()));
        columnNames.forEach(name -> fieldMapping.put(name, name));
        PageViewDeserializationSchema deserializationSchema = new PageViewDeserializationSchema(rowTypeInfo);
        Map
specificOffsets = new HashMap<>();
        Kafka011TableSource kafkaTableSource = new Kafka011TableSource(
                schema,
                proctimeAttribute,
                rowtimeAttributeDescriptors,
                Optional.of(fieldMapping),
                topic,
                sourceProperties,
                deserializationSchema,
                StartupMode.EARLIEST,
                specificOffsets);
        tEnv.registerTableSource("pageview", kafkaTableSource);

        // step5: register HBase sink
        HBaseTableSchema hBaseTableSchema = new HBaseTableSchema();
        hBaseTableSchema.setRowKey("log_date", String.class);
        hBaseTableSchema.addColumn("f", "UV", Long.class);
        HBaseOptions hBaseOptions = HBaseOptions.builder()
                .setTableName(hbaseTable)
                .setZkQuorum(hbaseZkQuorum)
                .setZkNodeParent(hbaseZkParent)
                .build();
        HBaseWriteOptions hBaseWriteOptions = HBaseWriteOptions.builder()
                .setBufferFlushMaxRows(1000)
                .setBufferFlushIntervalMillis(1000)
                .build();
        HBaseUpsertTableSink hBaseSink = new HBaseUpsertTableSink(hBaseTableSchema, hBaseOptions, hBaseWriteOptions);
        tEnv.registerTableSink("uv_index", hBaseSink);

        // step6: UV aggregation SQL (simple GROUP BY)
        String uvQuery = "insert into uv_index "
                + "select log_date, ROW(count(distinct mid) as UV) "
                + "from pageview "
                + "group by log_date";
        tEnv.sqlUpdate(uvQuery);

        // step7: execute job
        sEnv.execute("UV Job");
    }
}

The case demonstrates that once the Kafka source is deserialized into Flink rows and the table sink is registered, a concise SQL statement can compute UV in real time with minimal code.

Conclusion

By combining a well‑designed layered architecture, unified data modeling, and Flink’s streaming SQL capabilities, organizations can achieve second‑level real‑time analytics while maintaining a clear separation between raw, detailed, dimensional, and aggregated data, supporting both operational dashboards and complex OLAP queries.

FlinkSQLStreamingKafkaReal-time Datadata architectureUV Metric
DataFunTalk
Written by

DataFunTalk

Dedicated to sharing and discussing big data and AI technology applications, aiming to empower a million data scientists. Regularly hosts live tech talks and curates articles on big data, recommendation/search algorithms, advertising algorithms, NLP, intelligent risk control, autonomous driving, and machine learning/deep learning.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.