Bilibili Offline Computing Platform: Migration from Hive to Spark and Comprehensive Performance Optimizations
The article details Bilibili's evolution of its offline computing platform from Hadoop‑based Hive to Spark, describing migration tools, SQL conversion, result and resource comparison, shuffle stability, small‑file handling, runtime filters, data skipping, ZSTD support, Hive Metastore federation, traffic control, and future optimization directions.
Background
Since 2018 Bilibili built an offline computing service on Hadoop, expanding from a few hundred to nearly ten thousand nodes across multiple data centers, running about 200,000 daily batch jobs on Hive, Spark, and Presto.
From Hive to Spark
In early 2021 Hive handled >80% of offline jobs, Spark 2.4 about 20%. After Spark 3.1 was released, Bilibili migrated Hive‑SQL to Spark‑SQL, first manually, then with an automated migration tool that rewrites SQL, replaces input/output tables, and performs dual‑run result comparison.
SQL Conversion
The SparkSqlParser was rewritten to replace tables in DAG‑level jobs, convert SELECT statements to CTAS for result comparison, and skip or mark DDL statements that do not require computation.
Result Comparison
Dual‑run results are compared by schema and then by full‑table data using a GROUP BY + UNION ALL + GROUP BY strategy; mismatched rows are identified. Resource consumption (execution time, CPU, memory) is also compared, showing >40% execution‑time reduction and >30% overall resource savings after migration.
Migration & Rollback
Each migrated task is run at least three times with dual‑run checks; post‑migration monitoring compares the first three executions against the pre‑migration average, rolling back tasks that regress.
Spark Practice at Bilibili
Small‑File Problem : Two approaches were added—fallback merging of small files after write and a re‑partition‑based merge using Spark 3’s AQE rebalance hint to coalesce files before final write.
Shuffle Stability : SSD‑backed shuffle directories, a remote shuffle service (RSS) with push‑based shuffle, and a shuffle‑service master for node registration and health‑aware selection were implemented, improving shuffle stability and reducing execution time by ~25% for large jobs.
Large Result Set Handling : When driver memory is high, results are streamed to disk and fetched in batches.
SQL Task Parallelism & Limits : Dynamic limits on task count and parallelism are applied based on executor usage.
Dangerous Join Detection & Join Inflation : Join strategies are analyzed to flag joins that may cause OOM or high cost; join inflation rates are monitored.
Skewed Key Detection : Partition sizes are examined; skewed keys are sampled and reported to a diagnostic platform.
Performance Optimizations
Compatibility between DPP and AQE was back‑ported to Spark 3.2; AQE now supports ShuffledHashJoin via a DynamicJoin rule; runtime filters (dynamic bloom‑filter pruning) were added to prune large tables before shuffle, reducing rows from billions to tens of thousands in tests.
Data Skipping
ORC and Parquet statistics (min/max, count, sum) are leveraged; additional ordering (z‑ordering) of hot columns is applied to improve skipping, with example SQL shown below:
<span>select count(1) from tpcds.archive_spl_cluster where log_date = '20211124' and state = -16</span>Table conversion and small‑file merge syntax were introduced:
CONVERT TABLE target=tableIdentifier (convertFormat | compressType) partitionClause? #convertTable MERGE TABLE target=tableIdentifier partitionClause? #mergeTableFunctional Improvements
ZSTD compression support was back‑ported; a bug in ORC’s min/max statistics for long strings was fixed and contributed upstream. Multi‑format table reads were made compatible by refining DataSourceScanExec to handle per‑partition formats.
Smart Data Manager (SDM)
SDM provides asynchronous table format conversion, data re‑organization (order/z‑order), statistics collection, small‑file merging, Hive index creation, and lineage extraction, all with transparent Hive‑style lock management.
Hive Metastore Optimizations
MetaStore federation was chosen over WaggleDance to enable per‑IDC MySQL storage with minimal migration impact. Request tracing via Hadoop CallerContext and EnvironmentContext was added, along with traffic‑control listeners that throttle high‑QPS or large‑partition queries and can proactively close offending connections.
Future Work
Plans include researching remote shuffle services for Kubernetes, applying vectorized execution to Spark, and enhancing automated diagnostics.
Big Data Technology Architecture
Exploring Open Source Big Data and AI Technologies
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
