How Bilibili Optimized Flink Runtime for Massive Real‑Time Jobs
This article details Bilibili's extensive enhancements to the Flink runtime—including checkpoint recoverability, max‑parallelism calculations, State Processor API extensions, Full and Regional Checkpoints, hybrid HA, task‑level recovery, load‑balanced partitioners, and large‑scale cluster maintenance—to improve reliability and performance of its billion‑scale streaming workloads.
Background
Bilibili runs Flink‑based real‑time computation across three YARN cluster types (pure physical, mixed with Kafka, and mixed with K8s) with over 1,000 servers, 2,000+ cores for Kafka‑mixed, and 6,000+ cores for K8s‑mixed clusters. The platform supports AI, advertising, data warehousing, and many other services, with a maximum job parallelism of 2,000.
Checkpoint‑Related Improvements
Recoverability
To keep operator IDs stable when job parallelism changes, Bilibili extended the community StreamGraphHasherV2 with StreamGraphHasherV3, which ignores downstream chaining relationships during ID generation, preventing checkpoint incompatibility after scaling.
Max‑Parallelism Calculation
The original algorithm used
Math.min(Math.max(MathUtils.roundUpToPowerOfTwo(operatorParallelism + (operatorParallelism / 2)), 128), 32768). Bilibili raised the minimum to 1,024 and multiplied the base by ten, resulting in:
Math.min(
Math.max(
MathUtils.roundUpToPowerOfTwo((operatorParallelism + (operatorParallelism / 2)) * 10),
1024),
32768);They also recompute key‑group assignments for existing state files to match the new parallelism.
State Processor API Enhancements
Because the native API only works per‑operator and forces full deserialization of values, Bilibili added:
Reading all operator IDs from the _metadata file and invoking the API per operator.
Embedding necessary metadata in _metadata to construct StateDescriptor automatically.
Modifying the API to deserialize only keys (treating values as raw byte arrays).
These changes enable key‑group recomputation and restore jobs after parallelism changes.
Full Checkpoint
Combining incremental checkpoint benefits (multi‑threaded upload) with savepoint advantages (no dependency on previous checkpoints), Full Checkpoint uploads only incremental files while also uploading any dependent previous checkpoint files, eliminating long‑term storage bloat and simplifying cleanup.
Regional Checkpoint
For jobs with minimal ALL‑TO‑ALL connections, the DAG is split into independent Regions. Each Region can fail and be restarted independently, reducing restart scope. Bilibili implemented a CheckpointHandler abstraction with GlobalCheckpointHandler and RegionalCheckpointHandler, selectable via configuration.
Tooling Development
Periodic state‑file cleanup that parses _metadata, tracks referenced state files, and removes unreferenced ones.
Metadata enrichment: logging and storing Operator ID ↔ Operator Name mappings in _metadata for post‑mortem analysis.
Metadata inspection CLI that can filter and display specific operator mappings.
Availability Enhancements
Hybrid HA
Bilibili introduced a dual‑mode HA that prefers Zookeeper but falls back to HDFS when Zookeeper is unavailable, writing critical data (addresses, checkpoint IDs, job state, job graph) to both systems as needed.
JobManager Recovery with Task Continuity
During JobManager failover, Bilibili prevents automatic task cancellation, keeps running tasks alive, and snapshots the ExecutionGraph to a filesystem store for later reconstruction, reducing downtime from seconds to sub‑second levels.
Regional Checkpoint for HDFS Sink
Because the StreamingFileCommitter is single‑parallel, it forces the whole job into one Region. Bilibili made the commit operator RegionSharable so it can be duplicated across Regions, enabling effective Regional Checkpointing for HDFS‑sink jobs.
Rescale Partitioner for SQL Jobs
When Kafka partitions and global parallelism are not multiples, the default Rebalance mode creates a single Region. Bilibili added a Rescale mode (and a force‑rescale flag) that partitions data proportionally, allowing finer‑grained Region division and reducing restart impact.
Approximate Local Recovery
For high‑parallelism jobs with ALL‑TO‑ALL connections, Bilibili implemented single‑task recovery that restarts only the failed task, handles partial records in buffers, and coordinates upstream/downstream task state to avoid back‑pressure and data loss.
Other Optimizations
Backlog‑Based Load Balancing
Replacing the default round‑robin channel selector with a LoadBasedChannelSelector that monitors Backlog Size (a credit‑based flow‑control metric) and dynamically routes records to less‑loaded downstream subtasks.
Large‑Scale Cluster Maintenance
To safely decommission machines in a 1,000‑node YARN cluster, Bilibili introduced a black‑list‑aware restart workflow: label machines off YARN, add replacement nodes, then sequentially restart jobs on the black‑listed hosts via Flink’s REST API, ensuring continuous service.
Future Work
Plans include extending checkpoint compatibility to aggregation‑metric changes, async dimension‑join schema evolution, adapting Regional Checkpoint for HDFS Sink failover, and implementing zero‑downtime scaling without job restarts.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
ITPUB
Official ITPUB account sharing technical insights, community news, and exciting events.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
