Big Data 32 min read

How Bilibili Supercharged Flink: Checkpoint, HA, and Runtime Optimizations

This article details Bilibili's extensive enhancements to Flink's runtime—including checkpoint recoverability, operator ID stability, state processor extensions, hybrid high‑availability, regional checkpointing, and load‑based channel selection—to improve scalability, reliability, and operational efficiency of large‑scale streaming jobs.

Bilibili Tech
Bilibili Tech
Bilibili Tech
How Bilibili Supercharged Flink: Checkpoint, HA, and Runtime Optimizations

Background

Bilibili runs Flink jobs on three types of YARN clusters (pure physical, Kafka‑mixed, and K8s‑mixed) with thousands of servers and millions of cores, supporting AI, advertising, data warehousing, and other workloads. The maximum parallelism reaches 2000, and the platform faces checkpoint‑related limitations in this large‑scale environment.

Checkpoint Improvements

Recoverability

To make checkpoints more robust, Bilibili introduced a new Operator ID generation algorithm (StreamGraphHasherV3) that ignores downstream chaining, producing stable IDs even when parallelism changes, thus greatly increasing checkpoint recoverability.

Maximum Parallelism Calculation

The original algorithm was adjusted to set a minimum parallelism of 1024 and scale the maximum by ten times the current parallelism. The updated calculation is shown below.

// If user sets max parallelism, use it
// Otherwise compute as follows
Math.min(
    Math.max(
        MathUtils.roundUpToPowerOfTwo((operatorParallelism + (operatorParallelism / 2)) * 10),
        1024),
    32768);

State Processor API Extensions

To handle existing jobs, Bilibili extended the State Processor API to read all operator IDs from checkpoint metadata, generate custom StateDescriptors, and deserialize only keys (treating values as byte arrays). This enables key‑group recomputation and checkpoint compatibility after parallelism changes.

Operator Name Assisted Recovery

When Operator ID conflicts prevent checkpoint recovery, operator names are used as a bridge to map checkpoint IDs to DAG operators, allowing recovery for SQL jobs where operator names are unique.

Checkpoint Optimization

Full Checkpoint

Combining incremental checkpoint benefits with savepoint reliability, Full Checkpoint uploads only incremental files using multi‑threading while also uploading dependent previous checkpoints, eliminating long‑term dependency chains.

Regional Checkpoint

For jobs with minimal all‑to‑all connections, the DAG is divided into independent regions. Checkpoints are taken per region, reducing failure impact. The region division logic and handling are illustrated in the following diagrams.

Tooling Development

Utilities were built for checkpoint maintenance, including periodic state file cleanup, checkpoint metadata enrichment (logging Operator ID ↔ Name mappings), and metadata inspection tools that can filter by operator ID or name.

Availability Enhancements

Hybrid HA

To avoid Zookeeper instability at scale, a hybrid HA mechanism uses Zookeeper when healthy and falls back to HDFS when ZK fails. Critical data (ResourceManager address, checkpoint IDs, job state, job graph) is written to both systems, with versioning to ensure consistency.

JobManager Recovery (Reconciliation)

During JobManager failover, snapshots of the ExecutionGraph are stored in a filesystem store. Upon recovery, the ExecutionGraph is reconstructed, and tasks are reconciled to resume processing without full job restart.

Regional Checkpoint for HDFS Sink

A new RegionSharable interface allows the StreamingFileCommitter (single‑parallelism operator) to be included in multiple regions, enabling regional checkpointing for HDFS sink jobs.

public interface RegionShareOperator {
    void notifyRegionalCkComplete(RegionCheckpointFailedDetail detail);
}

public class RegionCheckpointFailedDetail {
    long checkpointId;
    FailedScope scope;
    Set failedTasks;
    enum FailedScope {UP_STREAM, DOWN_STREAM, BOTH}
    enum FailedType {ERROR, EXPIRED}
    static class FailedTaskInfo {
        int subTaskId;
        String taskName;
        FailedType reason;
        String message;
    }
}

Rescale Partitioner for SQL Jobs

When Kafka partitions and job parallelism are not multiples, Bilibili disables user‑set source parallelism and introduces a Rescale mode that adjusts region division, improving availability for SQL jobs.

Approximate Local Recovery

For high‑parallelism, all‑to‑all jobs, a single‑point recovery mechanism restarts only the failed task, handling partial records and buffer cleanup to avoid long downtime.

Other Optimizations

Backlog‑Based Load Balancing

Replacing round‑robin channel selection with a BacklogSize‑driven strategy dynamically balances load across downstream subtasks, reducing backpressure.

Large‑Scale Cluster Maintenance

A black‑list‑aware restart workflow safely drains jobs from machines slated for maintenance, ensuring continuous service during node removal.

Future Work

Plans include extending checkpoint compatibility to aggregation and async dimension‑join scenarios, adapting Regional Checkpoint for HDFS sink failover, and implementing zero‑downtime scaling of parallelism.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Big DataFlinkhigh availabilityStreamingCheckpointRuntime Optimization
Bilibili Tech
Written by

Bilibili Tech

Provides introductions and tutorials on Bilibili-related technologies.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.