Big Data 21 min read

How Bilibili Built a Remote State Backend for Flink Using Taishan KV Store

This article explains Bilibili's design and implementation of a remote state backend for Flink, detailing the motivations, pain points of the existing RocksDBStateBackend, the architecture of TaishanStateBackend, and the performance optimizations applied to achieve storage‑compute separation and faster rescaling.

dbaplus Community
dbaplus Community
dbaplus Community
How Bilibili Built a Remote State Backend for Flink Using Taishan KV Store

Background

To improve resource utilization, Bilibili is consolidating real‑time and online workloads onto a cloud‑native platform. Flink, as a stateful compute engine, requires both compute and durable storage, which motivates a migration from the local RocksDBStateBackend to a remote state backend that separates storage from compute.

Pain Points

Flink’s StateBackend stores task state in two categories:

OperatorStateBackend : small, unrelated to the key, e.g., Kafka offsets.

KeyedStateBackend : potentially large, tightly coupled with the computation (aggregations, joins, etc.).

Bilibili runs >4,000 streaming jobs, 95% of which are SQL jobs; about half have state, and many exceed 500 GB. The default RocksDBStateBackend keeps state on high‑performance local disks and uploads incremental snapshots to a filesystem. Two major issues arise:

Low disk utilization : Most tasks have little or no keyed state, leaving large disks under‑used.

Slow rescaling for large tasks : During rescale, state must be redistributed across TaskManagers, which can take up to 30 minutes for terabyte‑scale state.

RemoteStateBackend Concept

The solution is to externalize state storage so that each KeyGroupId is written directly to a remote KV service. This eliminates the need for costly local‑disk redistribution and enables true storage‑compute separation.

Selection of Taishan KV Store

After evaluating internal options, Bilibili chose the self‑developed Taishan KV store. Taishan is built on RocksDB and SparrowDB, uses Raft for consistency, and provides Java put / get / del / scan APIs, snapshot capability, horizontal scaling, and online upgrades—features required for a remote state backend.

State Switching Guarantee

In KeyedStateBackend, the KeyGroupId for a record is computed as MathUtils.murmurHash(key.hashCode()) % maxParallelism. This guarantees that identical keys are always processed by the same subtask. During a rescale, only the mapping from KeyGroupId to subtasks changes; the underlying data remains in its Taishan shard.

KeyGroup rescale diagram
KeyGroup rescale diagram

StateBackend Topology Design

The backend must support several state types (e.g., InternalValueState, InternalMapState, InternalListState, priority‑queue state) and checkpointing. The design aligns each KeyGroupId with a Taishan shard ID, keeps the shard count equal to Flink’s maxParallelism, and creates one Taishan table per operator that holds state.

StateBackend topology
StateBackend topology

TaishanStateBackend Architecture

Map each KeyGroupId to a Taishan shard ID (one‑to‑one).

Shard count equals maxParallelism and shards are immutable.

When a job starts, each operator with keyed state creates a dedicated Taishan table.

KV entries are prefixed with the operator’s column‑family to avoid cross‑operator collisions.

During a checkpoint, every shard creates a snapshot; all shards share the same snapshot ID.

During recovery, each shard restores from its snapshot using the same ID.

TaishanStateBackend architecture
TaishanStateBackend architecture

Optimizations

Write Optimizations

Writes are batched in a subtask‑local BlockingQueue. A background thread flushes the queue when either a batch size (e.g., 800 entries) or a latency threshold is reached, or when a checkpoint occurs. This reduces RPC calls by 2‑4× and improves write throughput.

Read Optimizations

A two‑layer cache sits between Flink and Taishan:

Bloom‑filter‑like off‑heap structure caches keys that exist, filtering most ReadNull requests.

Because Bloom‑filter false positives increase over time, a TTL‑based KV cache is added. Each entry stores an absolute expiration timestamp; eviction respects this timestamp.

TTL is also applied to window assigners (tumbling, sliding, cumulative) and interval joins, using windowLength + allowedLateness as the TTL value.

Memory Model Optimizations

Initially, a Caffeine cache was instantiated per subtask (N × M instances, where N is the number of slots and M the number of keyed states), causing high memory fragmentation and GC pressure. The solution replaces Caffeine with an off‑heap cache (OHC) shared across subtasks within the same slot:

Managed off‑heap region provides a fixed‑size hash table; automatic rehash is disabled.

Custom eviction logic checks the stored expiration timestamp before removal.

Timeout‑based eviction is disabled to avoid throughput impact.

Off‑heap cache structure
Off‑heap cache structure
Cache sharing diagram
Cache sharing diagram
Hash table configuration
Hash table configuration
Eviction logic
Eviction logic

Current Status and Future Work

Since November 2022, more than 100 online jobs with keyed state have been migrated to the remote backend. RPC overhead has caused a slight increase in overall resource usage, but storage‑compute separation and faster rescaling have been achieved.

Open challenges:

High QPS with sparse keys : Cache miss pressure can saturate GRPC request rates.

Large key/value scenarios : Write stalls appear in join‑heavy workloads with very large values.

Tiered state storage : The current off‑heap‑only cache may run out of memory. Future work will combine off‑heap memory, local SSDs, and remote Taishan storage into a multi‑layer cache (inspired by Flink Forward 2022’s Tiered State Backend) and leverage idle SSD capacity in mixed‑deployment clusters.

References

https://mp.weixin.qq.com/s/AqG0jCbDGjpjGTDdG7pSLw
https://docs.oracle.com/javase/8/docs/technotes/guides/vm/gctuning/g1_gc_tuning.html
https://github.com/facebook/rocksdb/wiki/Write-Buffer-Manager
https://github.com/snazy/ohc
Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Big DataFlinkstream processingState BackendTaishanRemote Storage
dbaplus Community
Written by

dbaplus Community

Enterprise-level professional community for Database, BigData, and AIOps. Daily original articles, weekly online tech talks, monthly offline salons, and quarterly XCOPS&DAMS conferences—delivered by industry experts.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.