Design and Performance Optimization of a Real‑Time Billion‑Scale Data Processing Pipeline
This article reviews the background, architecture, and a series of performance‑optimizing techniques—including consumption, batch, storage, and execution‑engine tweaks—applied to a real‑time pipeline that processes hundreds of billions of records daily, and presents the resulting resource savings and latency improvements.
Background – The author revisits a previous real‑time data collection case that used Logstash and expands it to handle a dramatically larger daily volume (from ~20 billion to ~500 billion records on the production side and from ~1 billion to >50 billion on the output side). The same technology stack (Java, Kafka, MLSQL, Logstash, Ruby, Hive, Elasticsearch, SparkSQL, DataX) is retained with additional optimizations.
Solution Design
1. Data Flow : Business systems push serialized data to MQ, a streaming platform consumes and partially processes it before returning to MQ; Logstash consumes, applies Ruby logic, and writes to HDFS; the data warehouse loads HDFS files, models them, and finally stores metric results in Elasticsearch.
2. Technical Stack : Data is serialized with Protobuf and sent to Kafka; MLSQL UDFs de‑duplicate every 3 seconds and write back to Kafka; Logstash consumes, calls external APIs (cached in environment variables to avoid frequent file I/O), and writes to HDFS; Hive loads data for modeling; DataX moves final metrics to Elasticsearch.
3. Problem Introduction : The original design would monopolize cluster resources when processing >50 billion records nightly, causing other scheduled jobs to stall.
Performance Optimizations
1. Consumption Optimization – Replaced file‑based caching with environment‑variable caching to reduce I/O overhead. Example code:
if not ENV["app_list"].nil?
event["arrays"] = ENV["app_list"].split(" ")
event.cancel if event["arrays"].include? event["service_name"]
# ENV["app_list"] holds the cached list
end2. Batch Processing Optimization – Adjusted mapper/reducer counts, switched output format to ORC with Snappy compression to limit small files and reduce network traffic. Example DDL:
create table if not exists tableA(
id string,
field1 string,
field2 string
)
partitioned by (date_id string, hour string)
stored as orc
tblproperties ("orc.compress" = "SNAPPY");3. Execution Engine Change – Moved hourly processing from Hive to SparkSQL, enabling dynamic partitioning, adaptive query execution, and broadcast join thresholds to speed up processing and lower resource consumption. Sample Spark settings:
set spark.sql.auto.repartition=true;
set spark.shuffle.service.enabled=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.dynamic.partition=true;
set spark.sql.adaptive.enabled=true;
set spark.sql.autoBroadcastJoinThreshold=209715200;
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=1024000000;Results
After applying the optimizations, maximum queue resource usage dropped from 1200 cores + 4.3 TB to 100 cores + 92 GB, and job completion time decreased from 14,803 seconds to 3,470 seconds.
Remaining Issues
Stability concerns persist due to heterogeneous Logstash consumer nodes causing occasional HDFS write delays; the team plans to migrate consumption logic to Flink for better reliability.
Additional references and related articles are listed at the end of the original post.
DataFunSummit
Official account of the DataFun community, dedicated to sharing big data and AI industry summit news and speaker talks, with regular downloadable resource packs.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.