How Bilibili Scaled Offline Computing: Migrating from Hive to Spark and Boosting Performance
This article details Bilibili's evolution from a Hadoop‑based offline platform to a Spark‑driven architecture, covering the Hive‑to‑Spark migration, automated SQL conversion, result validation, stability enhancements, performance tuning, meta‑store federation, and future directions for large‑scale data processing.
Background
In 2018 Bilibili built an offline compute platform on Hadoop, expanding from ~200 nodes to nearly 10,000 across multiple data centers. Hive, Spark and Presto were deployed at scale, with roughly 200,000 daily batch jobs running on Spark and Hive.
From Hive to Spark
At the beginning of 2021 Hive executed >80% of offline jobs while Spark 2.4 accounted for ~20%. After Spark 3.1 was released, Bilibili migrated Hive‑SQL workloads to Spark‑SQL to exploit the higher performance of Spark 3.1.
The migration was driven by the internal BSK scheduler (covering >80% of jobs) and Airflow submissions. Early manual migrations rewrote user SQL, ran both engines in parallel, and used a result‑comparison tool for correctness. This workflow later evolved into an automated migration tool.
SQL Conversion
The team rewrote SparkSqlParser to replace input and output tables in collected SQL, preserving DAG‑level dependencies. SELECT statements were transformed into CTAS statements so that results could be materialised for comparison; column names were encoded to avoid failures caused by Chinese characters. DDL statements that did not require computation were skipped and executed on Hive to prevent unintended metadata changes.
Result Comparison
Dual‑run results were first compared at the schema level using DESC. For matching schemas a full data comparison was performed by grouping on all columns, UNION ALL ‑ing the two tables, and aggregating again; rows with a count of 2 indicated identical data, while other counts highlighted differences. Container types (LIST, SET, MAP) and non‑deterministic columns (e.g., rand()) required manual analysis.
Resource usage (execution time, CPU, memory) was also measured. The migration yielded >40% reduction in execution time and >30% lower overall resource consumption when moving from Hive to Spark.
Migration & Rollback
Each migrated task performed at least three dual‑run comparisons. After migration, the first three scheduler executions were monitored; if time, CPU or memory regressed compared with the seven‑run pre‑migration average, the task was automatically rolled back and flagged for manual investigation.
Spark Practices at Bilibili
Stability Improvements
Small‑File Problem
Bottom‑up file merging: data is first written to a temporary directory; before refreshUpdatedPartitions small files are coalesced and then moved to the final location.
Repartition‑based merging: using Spark 3 AQE, a rebalance hint (similar to repartition) is added for jobs without shuffle, reducing small‑file write amplification.
Shuffle Stability
Disk tiering: SSD directories are preferred for DiskBlockManager work files; fallback to HDD when SSD space is low, improving shuffle I/O.
Remote Shuffle Service (RSS): a push‑based shuffle reduces random disk reads. A dedicated shuffle‑master node tracks external shuffle nodes, balances load and improves task placement, achieving ~25% shorter runtimes for large jobs.
Driver Memory Management
For ad‑hoc queries that pull massive result sets, the driver monitors memory usage; when usage is high, results are streamed to temporary files and served in batches to avoid OOM.
Task Parallelism & Execution Limits
Queue‑based isolation limits per‑SQL task count and total parallelism, dynamically adjusting limits based on current versus max executor counts.
Dangerous Join Detection & Join Inflation
Join strategy analysis flags broadcasts that could OOM the driver as “dangerous joins”. A monitoring thread computes join output‑row inflation by comparing child node row counts, alerting users to potential blow‑ups.
Skewed Key Detection
During shuffle fetch, partition sizes are examined; if a partition is identified as skewed, sampled keys are sent to the driver via TaskMetric, recorded, and surfaced in an internal diagnostics platform for user‑driven optimization.
Performance Optimizations
DPP & AQE Compatibility
Spark 3.2 back‑ported fixes allow Dynamic Partition Pruning (DPP) to work together with Adaptive Query Execution (AQE), delivering noticeable TPC‑DS gains.
AQE‑Enabled ShuffledHashJoin
AQE now promotes small‑table joins to ShuffledHashJoin when the largest partition size is below a configurable threshold, balancing performance and memory pressure.
Runtime Filter
A dynamic Bloom‑filter pruning rule builds a filter on the small side of a join, broadcasts it, and applies it to the large side before shuffle, reducing processed rows dramatically (e.g., from 12 billion to 30 k rows in a test).
Data Skipping
ORC/Parquet min/max statistics and row‑group indexes are leveraged for predicate push‑down. Additional ordering by hot columns further shrank scan volumes from billions to hundreds of thousands of rows.
ZSTD Support
Community patches were back‑ported to enable ZSTD compression in Spark 3.1 and an ORC‑related bug that broke predicate push‑down for long strings was fixed.
Mixed‑Format Reads DataSourceScanExec was adjusted to resolve readers at the partition level, allowing tables that contain both text and columnar files to be read seamlessly.
Convert & Merge Table Syntax
CONVERT TABLE target=tableIdentifier (convertFormat | compressType) partitionClause? #convertTable
MERGE TABLE target=tableIdentifier partitionClause? #mergeTableLineage Extraction
A LineageQueryListener captures logical‑plan expressions, maps exprId to columns, and builds field‑level lineage (PROJECTION/PREDICATE) and hierarchical relationships.
Automatic Parameter Optimization (HBO)
HBO fingerprints SQL jobs, aggregates execution metrics, and suggests parameter tweaks:
Memory: lower memory for jobs with <30% peak‑to‑allocation ratio, raise it otherwise.
Parallelism: adjust spark.dynamicAllocation.executorAllocationRatio and spark.sql.shuffle.partitions based on observed executor utilization.
Shuffle: enable RSS only for shuffle‑heavy jobs.
Small‑file merging: disable when no small files are present.
HBO also drives gradual feature roll‑outs.
Smart Data Manager (SDM)
SDM provides asynchronous table‑format conversion, partition‑level re‑ordering (z‑order), statistics collection, small‑file merging, Hive‑table indexing, and lineage extraction. Operations are coordinated via Hive Metastore locks to ensure transparent execution.
Hive Metastore Optimizations
MetaStore Federation
To avoid cross‑data‑center metadata latency, Bilibili evaluated two solutions: the open‑source WaggleDance project (https://github.com/ExpediaGroup/waggle-dance) and an in‑house HMS Federation. Federation was chosen for its fine‑grained migration capabilities and simpler operations.
Request Tracing & Traffic Control
Job identifiers are stored in Hadoop CallerContext and propagated via EnvironmentContext to HMS audit logs for easier debugging. A TrafficControlListener monitors QPS and memory usage per function/user; when thresholds are exceeded, calls are throttled or connections are closed to protect HMS stability.
Future Work
Investigate a truly remote shuffle service for Kubernetes‑native deployments.
Adopt vectorized execution to further accelerate Spark workloads.
Enhance automated diagnostics to improve user experience.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
ITPUB
Official ITPUB account sharing technical insights, community news, and exciting events.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
