How Bilibili Supercharged Flink: Checkpoint, HA, and Runtime Optimizations
This article details Bilibili's extensive enhancements to Flink's runtime—including checkpoint recoverability, operator ID stability, state processor extensions, hybrid high‑availability, regional checkpointing, and load‑based channel selection—to improve scalability, reliability, and operational efficiency of large‑scale streaming jobs.
Background
Bilibili runs Flink jobs on three types of YARN clusters (pure physical, Kafka‑mixed, and K8s‑mixed) with thousands of servers and millions of cores, supporting AI, advertising, data warehousing, and other workloads. The maximum parallelism reaches 2000, and the platform faces checkpoint‑related limitations in this large‑scale environment.
Checkpoint Improvements
Recoverability
To make checkpoints more robust, Bilibili introduced a new Operator ID generation algorithm (StreamGraphHasherV3) that ignores downstream chaining, producing stable IDs even when parallelism changes, thus greatly increasing checkpoint recoverability.
Maximum Parallelism Calculation
The original algorithm was adjusted to set a minimum parallelism of 1024 and scale the maximum by ten times the current parallelism. The updated calculation is shown below.
// If user sets max parallelism, use it
// Otherwise compute as follows
Math.min(
Math.max(
MathUtils.roundUpToPowerOfTwo((operatorParallelism + (operatorParallelism / 2)) * 10),
1024),
32768);State Processor API Extensions
To handle existing jobs, Bilibili extended the State Processor API to read all operator IDs from checkpoint metadata, generate custom StateDescriptors, and deserialize only keys (treating values as byte arrays). This enables key‑group recomputation and checkpoint compatibility after parallelism changes.
Operator Name Assisted Recovery
When Operator ID conflicts prevent checkpoint recovery, operator names are used as a bridge to map checkpoint IDs to DAG operators, allowing recovery for SQL jobs where operator names are unique.
Checkpoint Optimization
Full Checkpoint
Combining incremental checkpoint benefits with savepoint reliability, Full Checkpoint uploads only incremental files using multi‑threading while also uploading dependent previous checkpoints, eliminating long‑term dependency chains.
Regional Checkpoint
For jobs with minimal all‑to‑all connections, the DAG is divided into independent regions. Checkpoints are taken per region, reducing failure impact. The region division logic and handling are illustrated in the following diagrams.
Tooling Development
Utilities were built for checkpoint maintenance, including periodic state file cleanup, checkpoint metadata enrichment (logging Operator ID ↔ Name mappings), and metadata inspection tools that can filter by operator ID or name.
Availability Enhancements
Hybrid HA
To avoid Zookeeper instability at scale, a hybrid HA mechanism uses Zookeeper when healthy and falls back to HDFS when ZK fails. Critical data (ResourceManager address, checkpoint IDs, job state, job graph) is written to both systems, with versioning to ensure consistency.
JobManager Recovery (Reconciliation)
During JobManager failover, snapshots of the ExecutionGraph are stored in a filesystem store. Upon recovery, the ExecutionGraph is reconstructed, and tasks are reconciled to resume processing without full job restart.
Regional Checkpoint for HDFS Sink
A new RegionSharable interface allows the StreamingFileCommitter (single‑parallelism operator) to be included in multiple regions, enabling regional checkpointing for HDFS sink jobs.
public interface RegionShareOperator {
void notifyRegionalCkComplete(RegionCheckpointFailedDetail detail);
}
public class RegionCheckpointFailedDetail {
long checkpointId;
FailedScope scope;
Set failedTasks;
enum FailedScope {UP_STREAM, DOWN_STREAM, BOTH}
enum FailedType {ERROR, EXPIRED}
static class FailedTaskInfo {
int subTaskId;
String taskName;
FailedType reason;
String message;
}
}Rescale Partitioner for SQL Jobs
When Kafka partitions and job parallelism are not multiples, Bilibili disables user‑set source parallelism and introduces a Rescale mode that adjusts region division, improving availability for SQL jobs.
Approximate Local Recovery
For high‑parallelism, all‑to‑all jobs, a single‑point recovery mechanism restarts only the failed task, handling partial records and buffer cleanup to avoid long downtime.
Other Optimizations
Backlog‑Based Load Balancing
Replacing round‑robin channel selection with a BacklogSize‑driven strategy dynamically balances load across downstream subtasks, reducing backpressure.
Large‑Scale Cluster Maintenance
A black‑list‑aware restart workflow safely drains jobs from machines slated for maintenance, ensuring continuous service during node removal.
Future Work
Plans include extending checkpoint compatibility to aggregation and async dimension‑join scenarios, adapting Regional Checkpoint for HDFS sink failover, and implementing zero‑downtime scaling of parallelism.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Bilibili Tech
Provides introductions and tutorials on Bilibili-related technologies.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
