How Bilibili Scaled Offline Processing Across Multiple Data Centers
This article details Bilibili's multi‑datacenter solution for offline big‑data workloads, covering the challenges of capacity limits, the design of a unit‑based architecture, job placement, data replication, routing, versioning, bandwidth throttling, traffic analysis, and future directions.
Background
Rapid growth of Bilibili offline clusters exhausted rack capacity, requiring a multi‑data‑center architecture instead of scaling up a single data‑center.
Problem Statement
Two main challenges: (1) bandwidth consumption caused by massive offline batch traffic crossing data‑centers, and (2) network jitter or outages that can degrade job performance or cause data loss.
Design Overview
Adopt a unit‑based architecture: each data‑center (IDC) is an independent unit hosting a full YARN + HDFS stack and all services required for jobs placed in that unit. This isolates failures and eliminates most cross‑site traffic. Job placement and data replication are coordinated through a central DataManager service. Data access is routed by an extended HDFS Router that selects the nearest replica based on client IP. A version service guarantees replica consistency, and a token‑bucket throttling service limits cross‑site bandwidth.
Key Components
DataManager – REST API storing IDC assignment for each job and path‑based replication rules. Scheduling systems (Archer, Airflow) query it to obtain placement, set YARN app tags, and verify replica readiness.
Job Placement – Periodic DAG analysis builds a dependency graph, applies community‑detection to partition the graph into tightly‑coupled sub‑units, and assigns each sub‑unit to an IDC respecting resource constraints.
Data Replication Service (DRS) – Enhanced DistCp implementation providing correctness, atomicity, idempotence, flow‑control, multi‑tenant priority and lifecycle management. Rules from DataManager drive path extraction; Hive Metastore events are streamed to Kafka, processed by Flink to detect hot partitions, and trigger DRS to create cross‑IDC replicas. Replica metadata (path, version, TTL) is persisted for cleanup.
HDFS Router with Multi‑Mount Points – Primary mount holds the original data; additional read‑only mounts hold replicas. Router inspects client IP, maps it to an IDC, and redirects reads/writes to the nearest replica. A special IDC_FOLLOW mount handles temporary tables with nondeterministic paths.
Version Service – Subscribes to HDFS JournalNode edit logs, uses transaction IDs as version identifiers, and exposes an API for jobs to verify that a replica matches the latest version before execution.
Throttle Service – Global token‑bucket service. ThrottledDistributedFileSystem checks whether a read/write would cross IDC boundaries, requests a token from ThrottleService, and proceeds only if granted. Tokens represent large bandwidth units; priority queues enforce weighted fairness. Fallback to local fixed bandwidth is used if the service is unavailable.
Cross‑IDC Traffic Analyzer – DFSClient is instrumented to embed JobId; DataNode extracts JobId and client IP, aggregates traffic per job every 30 s, and writes logs to Kafka. Flink pipelines forward logs to ClickHouse, producing top‑N cross‑IDC traffic jobs for remediation.
Workflow Example (Hive Job)
Dependency analysis runs daily, partitions the DAG, and stores placement info in DataManager.
When Archer/Airflow submits a Hive job, it queries DataManager for the target IDC and checks replica readiness. If replicas are not ready, submission is blocked until DRS finishes replication.
Job is launched on the YARN cluster of the target IDC (app tag used for federation‑aware scheduling).
During execution, the Hive driver accesses HDFS. The Router uses the client’s IP to resolve the nearest replica mount point and serves data locally.
Before reading a replica, the job calls the Version Service to ensure the replica’s version matches the latest edit‑log transaction. Mismatches cause temporary blocking.
All cross‑IDC reads/writes pass through ThrottledDistributedFileSystem, which may throttle or downgrade to local bandwidth.
Implementation Details
1. Job Placement
Dependency analysis builds a DAG from scheduling metadata, then applies community detection (e.g., Louvain algorithm) to identify sub‑graphs with high internal coupling. Each sub‑graph is assigned to an IDC based on resource availability and data locality. The result is persisted in DataManager as {job_id: {target_idc, path_rules}}. Archer sets these rules via the DataManager API before job submission.
2. Data Replication
DRS wraps Apache DistCp ( hadoop distcp) with additional steps:
# Example DistCp command used by DRS
hadoop distcp -update -delete -bandwidth 1000 \
hdfs://source-ns/path/to/table/partition \
hdfs://target-ns/path/to/table/partitionEnhancements include:
Transactional commit: copy to a temporary location then rename.
Idempotent retries using checksum comparison.
Per‑tenant bandwidth quotas and priority.
Lifecycle: TTL metadata stored in DataManager; a cleanup job removes expired replicas.
3. Data Routing
HDFS Router configuration (RBF) defines a mount table:
# Example mount table entry
/mount/primary hdfs://dc1-ns rw
/mount/replica1 hdfs://dc2-ns ro
/mount/replica2 hdfs://dc3-ns roWhen a client from IDC 2 accesses /mount/primary, the Router rewrites the request to /mount/replica1. For temporary tables, the IDC_FOLLOW mount contains a list of replica mounts; the Router selects the entry whose IDC matches the client.
4. Version Service
Version Service runs a JournalNode observer:
# Pseudocode of version update
for each editlog entry:
if entry.path in watched_paths:
version_map[entry.path] = entry.txidJobs query /version?path=... to obtain the latest txid and compare it with the replica’s stored version.
5. Bandwidth Throttling
Token‑bucket parameters (example):
Token size = 10 GB.
Bucket capacity = 100 tokens per IDC.
Priority weights: high‑priority jobs receive 2× tokens.
When ThrottledDistributedFileSystem detects a cross‑IDC block read, it calls:
Token token = ThrottleService.requestTokens(jobId, requiredBytes);
if (token == null) { block until granted or fallback; }6. Traffic Analysis
Instrumentation steps:
DFSClient constructor adds JobId to ClientName.
DataNode’s DataXceiver extracts JobId and client IP, aggregates bytes per (JobId, IDC) pair, and emits a log line every 30 s.
Flink job reads the logs, writes aggregated metrics to ClickHouse, and a dashboard shows top‑10 cross‑IDC traffic jobs.
Results and Outlook
The solution has been in production for >6 months, migrating ~300 PB of data and handling ~33 % of offline jobs. Bandwidth saturation and network‑induced failures have been significantly reduced.
Future work includes:
Automating DAG community detection for continuous migration planning.
Extending the architecture to support more dynamic workloads (e.g., streaming).
Further unit‑based decomposition to achieve “active‑active” high‑availability for critical jobs.
References
Unit‑based architecture in finance: https://cloud.tencent.com/developer/article/1891503
Unit‑based overview: https://help.aliyun.com/document_detail/159741.html
Meituan‑Dianping multi‑site Hadoop practice: https://www.infoq.cn/article/fo*rxliycw7exf3g8mct
Ctrip Hadoop cross‑site architecture: https://www.infoq.cn/article/vV53CWvZgw7oVkjT623K
ByteDance 100k‑node HDFS multi‑site evolution: https://www.infoq.cn/article/gtlguya2mo8rgbwndt2x
Cloud‑ladder multi‑NameNode cross‑site: https://www.slideserve.com/jola/namenode
Yugong: Geo‑Distributed Data and Job Placement at Scale: http://www.cse.cuhk.edu.hk/~jcheng/papers/yugong_vldb19.pdf
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
ITPUB
Official ITPUB account sharing technical insights, community news, and exciting events.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
