Design and Implementation of a Real-time and Offline Integrated Query System
This article details the requirements, architecture, and implementation of a real-time and offline integrated query system, covering data ingestion via Debezium and Confluent Platform, storage in Kudu and HDFS, query engines Presto and Kylin, and strategies for data synchronization, partitioning, and scaling.
Flow Query Requirements
Based on terabyte‑level online data, the system supports detailed bill queries similar to bank statements, allowing multi‑year searches with dimensions such as billing period, arrears status, date range, fee type, house category, project, contract information, and statistical columns.
What Is Real-time Data
Real-time data includes real-time collection, computation, and low‑latency output. It is data captured directly from source systems and the intermediate or result data produced by real-time calculations, which lose validity as time passes.
Instant Query System
Long‑term, non‑archivable data such as rental, utilities, and property management fees pose challenges for lightweight databases like MySQL due to growing size and complex aggregations. To meet query needs without overhauling OLTP systems, an OLAP‑focused instant query solution was introduced.
Architecture Implementation
Data Real-time Sync – Debezium
Debezium captures changes from MySQL and streams them to Kafka via Kafka Connect, providing low‑latency, reliable data replication.
Confluent Platform
The Confluent Platform builds on Kafka, offering scalable, fault‑tolerant data integration. It includes Kafka Connect for connectors, Schema Registry for Avro serialization, and a custom data integration service for monitoring.
Kudu‑Connector
A lightweight Kudu connector (source code at https://github.com/dengbp/big-well ) was developed to avoid dependencies on Impala and Hive, handling table synchronization and configuration.
// source connector
curl -i -X POST -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '{
"name": "test_data-bill_rating_rule_def-connector-souces",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "127.0.0.1",
"database.port": "3306",
"database.user": "platform",
"database.password": "platform@0708",
"database.server.id": "169798195",
"database.server.name": "test_data_connector",
"database.whitelist": "test_data",
"table.whitelist": "test_data.bill_rating_rule_def",
"database.history.kafka.bootstrap.servers": "broker161:9092,broker162:9092,broker163:9092,broker166:9092,broker164:9092,cdh6-slave1:9092,cmhk-b-sl-236:9092",
"database.history.kafka.topic": "dbhistory.inventory",
"include.schema.changes": "true",
"database.history.skip.unparseable.ddl": "true",
"decimal.handling.mode": "string",
"event.deserialization.failure.handling.mode": "ERROR"
}
}' // sink connector
curl -i -X POST -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '{
"name": "test_data-bill_rating_rule_def-connector-sink-49",
"config": {
"connector.class": "com.yr.connector.KuduSinkConnector",
"tasks.max": "16",
"topics": "test_data_connector.test_data.bill_rating_rule_def",
"topic.table.map": "test_data_connector.test_data.bill_rating_rule_def:bill_rating_rule_def",
"table.list": "bill_rating_rule_def",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"database.history.kafka.bootstrap.servers": "broker161:9092,broker162:9092,broker163:9092,broker166:9092,broker164:9092,cdh6-slave1:9092,cmhk-b-sl-236:9092",
"database.history.kafka.topic": "dbhistory.inventory",
"kudu.masters": "kudu167:7051,cdh6-slave1:7051,cmhk-b-sl-236:7051,cdh6-slave2:7051,cdh6-slave3:7051,cdh6-slave4:7051,cdh6-slave5:7051,cdh6-slave6:7051,cdh6-slave7:7051,cdh6-slave8:7051,cdh6-slave9:7051",
"max.retries": "3",
"retry.backoff.ms": "1000",
"behavior.on.exception": "FAIL",
"linger.ms": "1000",
"batch.size": "5000",
"max.buffered.records": "8000",
"flush.timeout.ms": "6000",
"offset.flush.timeout.ms": "5000"
}
}'Real-time Data Warehouse – Kudu
Kudu provides distributed columnar storage with row‑level updates, suitable for OLAP workloads and tightly integrated with other big‑data frameworks. Tables are partitioned by hash (ID, type) and range (creation time) to ensure balanced distribution and efficient queries.
Query Engine – Presto
Presto offers near‑standard SQL support without relying on Hive metastore, enabling quick deployment and compatibility with existing MySQL syntax. Custom JDBC configurations allow multiple data sources, though some MySQL‑specific syntax required adjustments.
Expand Business Coverage
The system evolved from a single bill‑detail query to a platform‑wide instant query service handling TB‑scale OLAP data, scaling from a dozen to over thirty nodes across Confluent Platform, Kudu, Presto, and Zeppelin.
Big Data Requirements (Phase 2)
Beyond billing, the asset leasing service needs analytics on complaints, satisfaction surveys, utilities usage, equipment failures, and competitive intelligence gathered via web crawling.
Technical Architecture of Real-time Offline Integrated System
The architecture combines Kafka‑based real‑time ingestion, ETL to Kudu/MySQL for immediate use, and Hive‑based ODS for offline analysis, followed by pre‑computed results stored in DW or APP layers. Pre‑computation uses Apache Kylin with a custom Kudu storage module.
Pre‑computation Scheme (Kylin+Kudu)
Kylin pre‑computes frequent queries, storing results in Kudu via a self‑developed storage‑kudu module, leveraging Spark as the build engine and Hive as the data source.
Unified Data Access Entry
To avoid duplicate storage, real‑time Kudu tables are exposed to Hive through external tables, enabling SparkSQL, Kylin, and HQL to query a unified data layer.
create view tb_uhome_acct_item_view as
SELECT ... FROM tb_uhome_acct_item WHERE create_date >= "2017-01-01"
UNION ALL
SELECT ... FROM tb_uhome_acct_item_hdfs WHERE create_date < "2017-01-01"Hive‑Kudu Metadata Integration
Custom Hive storage handlers (KuduStorageHandler, KuduSerDe, etc.) map Kudu tables to Hive, ensuring metadata consistency and enabling CRUD operations via Hive.
Transparent Data Layer Storage
Data resides in both Kudu (hot) and HDFS (cold). Views combine hot and cold partitions to provide seamless query results across storage tiers.
Future Outlook
Extend metadata services to support more storage engines.
Implement end‑to‑end latency monitoring for Kafka topics.
Further integrate Hive with Kudu for full DDL support.
Enhance Kylin with dynamic cube generation based on user‑defined dimensions and measures.
Architect
Professional architect sharing high‑quality architecture insights. Topics include high‑availability, high‑performance, high‑stability architectures, big data, machine learning, Java, system and distributed architecture, AI, and practical large‑scale architecture case studies. Open to ideas‑driven architects who enjoy sharing and learning.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.