Building a Real-Time TB-Scale Bill Query System with Kafka, Kudu, and Presto
This article details the design and implementation of a real‑time, TB‑scale bill‑detail query platform that leverages Kafka for streaming, Debezium and Confluent Platform for change capture, Kudu for low‑latency storage, and Presto/Kylin for fast OLAP queries, while outlining deployment, integration, and future enhancements.
Bill Query Requirement
Based on TB‑scale online data, the system must support detailed bill‑payment queries similar to bank statement lookups, allowing multi‑year searches across dimensions such as billing period, arrears status, date range, fee type, house category, project, contract info, and statistical columns.
What Is Real‑Time Data?
Real‑time data includes real‑time collection, real‑time computation, and low‑latency output. It is captured directly from source systems, processed immediately, and loses relevance as time passes.
Instant Query System
Bill data (rent, utilities, property fees) often have indefinite retention periods, causing massive growth in MySQL. Complex queries can exhaust memory, making traditional OLTP/OLAP separation impractical. The solution is an instant query system that provides OLAP‑only capabilities without impacting existing business systems.
Technical Requirements
Support random combinations of over 70 query conditions for bill details, covering multi‑year ranges.
Allow complex joins, including multi‑table left joins.
Minimize SQL changes on the business side by supporting standard SQL.
Ensure real‑time synchronization for frequently changing business data.
Architecture Overview
Data Real‑Time Sync – Debezium & Confluent Platform
Debezium : Captures MySQL changes with low latency, streams them to Kafka via Kafka Connect, providing durability, reliability, and fault tolerance.
Confluent Platform : Uses Kafka Connect for scalable data integration, includes Schema Registry for Avro serialization, and offers a commercial monitoring UI (Confluent Center). A custom Data Integration Service was built to provide monitoring and operational features.
Kudu Connector
A lightweight Kudu connector was implemented (source: https://github.com/dengbp/big-well ) to avoid heavy dependencies on Impala and Hive. It syncs tables to Kudu based on configured rules.
Source and Sink Connector Parameters
//source connector:
curl -i -X POST -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '{
"name": "test_data-bill_rating_rule_def-connector-souces",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "127.0.0.1",
"database.port": "3306",
"database.user": "platform",
"database.password": "platform@0708",
"database.server.id": "169798195",
"database.server.name": "test_data_connector",
"database.whitelist": "test_data",
"table.whitelist": "test_data.bill_rating_rule_def",
"database.history.kafka.bootstrap.servers": "broker161:9092,broker162:9092,broker163:9092,broker166:9092,broker164:9092,cdh6-slave1:9092,cmhk-b-sl-236:9092",
"database.history.kafka.topic": "dbhistory.inventory",
"include.schema.changes": "true",
"database.history.skip.unparseable.ddl": "true",
"decimal.handling.mode": "string",
"event.deserialization.failure.handling.mode": "ERROR"
}
}' //sink connector:
curl -i -X POST -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '{
"name": "test_data-bill_rating_rule_def-connector-sink-49",
"config": {
"connector.class": "com.yr.connector.KuduSinkConnector",
"tasks.max": "16",
"topics": "test_data_connector.test_data.bill_rating_rule_def",
"topic.table.map": "test_data_connector.test_data.bill_rating_rule_def:bill_rating_rule_def",
"table.list": "bill_rating_rule_def",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"database.history.kafka.bootstrap.servers": "broker161:9092,broker162:9092,broker163:9092,broker166:9092,broker164:9092,cdh6-slave1:9092,cmhk-b-sl-236:9092",
"database.history.kafka.topic": "dbhistory.inventory",
"kudu.masters": "kudu167:7051,cdh6-slave1:7051,cmhk-b-sl-236:7051,cdh6-slave2:7051,cdh6-slave3:7051,cdh6-slave4:7051,cdh6-slave5:7051,cdh6-slave6:7051,cdh6-slave7:7051,cdh6-slave8:7051,cdh6-slave9:7051",
"max.retries": "3",
"retry.backoff.ms": "1000",
"behavior.on.exception": "FAIL",
"linger.ms": "1000",
"batch.size": "5000",
"max.buffered.records": "8000",
"flush.timeout.ms": "6000",
"offset.flush.timeout.ms": "5000"
}
}'Real‑Time Warehouse – Kudu
Kudu provides distributed columnar storage with row‑level updates, suitable for OLAP workloads and tightly integrated with other big‑data frameworks. Bill tables are partitioned by hash on ID and type, and range‑partitioned by creation time; related tables use similar hash‑range schemes.
Query Engine – Presto for Sub‑Second Response
Presto offers simple deployment without Hive Metastore dependence and supports near‑standard SQL, reducing business‑side changes. Some MySQL‑specific syntax required custom extensions to the Presto JDBC driver.
Expanding Business Coverage
The system evolved from a simple bill‑detail query to a platform‑wide instant query solution handling TB‑scale OLAP data across more than thirty nodes.
Big Data Demand – Phase Two
Beyond bill analytics, the asset‑leasing service needs precise marketing, competitor data crawling, and comprehensive analytics (customer complaints, satisfaction surveys, utility usage, equipment failures).
Real‑Time & Offline Integrated System Architecture
Data Flow
Real‑Time & Offline Integration Access
Data sources include internal MySQL, app user‑trajectory logs, external web‑crawled competitor data, and log files. Simple‑cleaned real‑time data bypasses the ODS layer and feeds directly into Kudu or the app layer.
Hive‑Kudu Metadata Integration
To make Kudu tables accessible via Hive, external tables are created using KuduStorageHandler, KuduSerDe, and related classes. This enables Hive, SparkSQL, and Kylin to query the same data uniformly.
Transparent Data Layer Storage
Data resides in Kudu (real‑time) and HDFS (cold). Cold data is periodically migrated to HDFS based on business‑defined retention periods, and unified views combine hot and cold partitions to ensure complete query results.
View Creation for Cold Data
create view tb_uhome_acct_item_view as
SELECT COMMUNITY_ID,STAGE_ID,NAME,UNIT,HOUSE_NAME,BILL_AREA,PAY_USERID,BILLING_CYCLE,FEE_ITEM_TYPE_ID,RULE_NAME,RES_INST_NAME,HOUSE_STATUS_TYPE,HOUSE_STATUS,REAL_CYCLE,CONCAT(BILL_DATE_START, BILL_DATE_END),LEASE_POSITION,OBJ_CODE
FROM tb_uhome_acct_item
WHERE create_date >= "2017-01-01"
UNION ALL
SELECT COMMUNITY_ID,STAGE_ID,NAME,UNIT,HOUSE_NAME,BILL_AREA,PAY_USERID,BILLING_CYCLE,FEE_ITEM_TYPE_ID,RULE_NAME,RES_INST_NAME,HOUSE_STATUS_TYPE,HOUSE_STATUS,REAL_CYCLE,CONCAT(BILL_DATE_START, BILL_DATE_END),LEASE_POSITION,OBJ_CODE
FROM tb_uhome_acct_item_hdfs
WHERE create_date < "2017-01-01";Future Outlook
Extend the integrated architecture to support more storage engines via Hive Metastore.
Implement end‑to‑end latency monitoring for each Kafka topic.
Further optimize Hive‑Kudu integration and view handling.
Enhance Kylin with dynamic cube building based on user‑defined dimensions and measures.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
21CTO
21CTO (21CTO.com) offers developers community, training, and services, making it your go‑to learning and service platform.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
