Remote State Backend for Flink: Design, Optimization, and Deployment with Taishan KV Store
This article describes the motivation, challenges, design, and performance optimizations of a remote state backend for Flink that leverages Bilibili's Taishan distributed KV store to achieve storage‑compute separation, lighter checkpoints, faster rescaling, and improved resource utilization in large‑scale streaming jobs.
1. Background
In the context of industry‑wide cost‑reduction and efficiency‑boosting, Bilibili is migrating online and real‑time workloads to a cloud‑native architecture, consolidating resource pools and schedulers. Real‑time jobs need strong compute capabilities, while online jobs typically lack storage and I/O. Flink, as a stateful compute engine, requires both compute and storage, prompting a remote state‑backend redesign.
2. Pain Points
Flink’s StateBackend stores task state in two forms: OperatorStateBackend (small, independent of data size) and KeyedStateBackend (large, tied to data volume and computation). Bilibili runs >4,000 streaming jobs, 95% of which are SQL, half with state, and many with state sizes >500 GB. The default RocksDBStateBackend stores state locally on high‑performance disks and uploads incremental snapshots, which leads to two main issues:
Low overall disk utilization because most tasks have small or no keyed state.
Slow rescaling for large‑state jobs, as state must be redistributed across TaskManagers, taking up to half an hour.
3. RemoteStateBackend
To address these issues, state is moved to a remote service, reducing Flink’s disk dependency and achieving storage‑compute separation. Bilibili’s self‑developed Taishan KV store (based on RocksDB and SparrowDB with Raft consistency) provides Java put/get/del/scan APIs and snapshot capabilities, making it suitable for a TaishanStateBackend.
3.1 State Switching Guarantee
KeyGroupId is calculated as MathUtils.murmurHash(key.hashCode) % maxParallelism, ensuring that identical keys are processed by the same subtask. During rescale, only KeyGroup ranges change, avoiding costly data movement when using Taishan.
Taishan’s shard concept aligns with Flink’s KeyGroup; shards are fixed and snapshots are created per shard, matching Flink’s checkpoint IDs.
3.2 StateBackend Topology Design
KeyedStateBackend manages various state types (InternalValueState, InternalMapState, InternalListState, etc.) and PriorityQueueState. Checkpointing is the reliability backbone; implementing RemoteStateBackend requires handling SnapshotStrategy and RestoreOperation interfaces.
3.3 TaishanStateBackend Architecture
KeyGroupId ↔ Taishan shard ID one‑to‑one.
Shard count equals Flink maxParallelism and is immutable.
Each keyed operator creates a dedicated Taishan table.
KV keys are prefixed with the operator identifier.
During checkpoint, each shard creates a snapshot with the same snapshot ID.
During restore, each shard is restored using its snapshot ID.
4. Optimizations
Metrics collected from RocksDBStateBackend showed millions of read/write requests per second and varying payload sizes. Moving state to remote storage increased RPC overhead, raising CPU usage. To mitigate this, a cache layer was added between Flink and Taishan, and batch‑write techniques were applied.
4.1 Write Optimization – Batch Write
Put/remove requests are queued in a BlockingQueue per subtask and flushed asynchronously when a threshold is reached or during checkpoint, reducing network write volume by orders of magnitude.
4.2 Read Optimization – Cache‑Accelerated Reads
A Caffeine cache (later replaced by an off‑heap OHC cache) stores recently accessed state, dramatically cutting RPC calls. Read‑null scenarios (sparse keys or periodic key patterns) were initially filtered with Bloom filters, but due to false‑positive growth, a custom OffHeapBloomFilter with TTL‑aware entries was introduced.
4.3 Memory Model Optimization
Original per‑operator Caffeine caches caused excessive object count (N × M) and GC pressure. Switching to an off‑heap OHC cache shared across subtasks reduced GC overhead and improved eviction control, with custom hash table sizing and LRU eviction logic aware of state TTL.
5. Current Status and Future Work
Since November 2022, >100 stateful jobs have been migrated to TaishanStateBackend, achieving storage‑compute separation and faster rescaling, albeit with a slight increase in network‑related resource usage. Remaining challenges include high‑QPS scenarios with sparse keys, large key/value cases causing write stalls, and the need for tiered state storage that combines off‑heap cache, local SSD, and remote KV store.
Future plans involve adopting tiered state‑backend concepts, leveraging idle SSDs in “disk‑less” deployments, and expanding TaishanStateBackend coverage across more Flink jobs.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
