Big Data 20 min read

Remote State Backend for Flink: Design, Optimization, and Deployment with Taishan KV Store

This article describes the motivation, challenges, design, and performance optimizations of a remote state backend for Flink that leverages Bilibili's Taishan distributed KV store to achieve storage‑compute separation, lighter checkpoints, faster rescaling, and improved resource utilization in large‑scale streaming jobs.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Remote State Backend for Flink: Design, Optimization, and Deployment with Taishan KV Store

1. Background

In the context of industry‑wide cost‑reduction and efficiency‑boosting, Bilibili is migrating online and real‑time workloads to a cloud‑native architecture, consolidating resource pools and schedulers. Real‑time jobs need strong compute capabilities, while online jobs typically lack storage and I/O. Flink, as a stateful compute engine, requires both compute and storage, prompting a remote state‑backend redesign.

2. Pain Points

Flink’s StateBackend stores task state in two forms: OperatorStateBackend (small, independent of data size) and KeyedStateBackend (large, tied to data volume and computation). Bilibili runs >4,000 streaming jobs, 95% of which are SQL, half with state, and many with state sizes >500 GB. The default RocksDBStateBackend stores state locally on high‑performance disks and uploads incremental snapshots, which leads to two main issues:

Low overall disk utilization because most tasks have small or no keyed state.

Slow rescaling for large‑state jobs, as state must be redistributed across TaskManagers, taking up to half an hour.

3. RemoteStateBackend

To address these issues, state is moved to a remote service, reducing Flink’s disk dependency and achieving storage‑compute separation. Bilibili’s self‑developed Taishan KV store (based on RocksDB and SparrowDB with Raft consistency) provides Java put/get/del/scan APIs and snapshot capabilities, making it suitable for a TaishanStateBackend.

3.1 State Switching Guarantee

KeyGroupId is calculated as MathUtils.murmurHash(key.hashCode) % maxParallelism, ensuring that identical keys are processed by the same subtask. During rescale, only KeyGroup ranges change, avoiding costly data movement when using Taishan.

Taishan’s shard concept aligns with Flink’s KeyGroup; shards are fixed and snapshots are created per shard, matching Flink’s checkpoint IDs.

3.2 StateBackend Topology Design

KeyedStateBackend manages various state types (InternalValueState, InternalMapState, InternalListState, etc.) and PriorityQueueState. Checkpointing is the reliability backbone; implementing RemoteStateBackend requires handling SnapshotStrategy and RestoreOperation interfaces.

3.3 TaishanStateBackend Architecture

KeyGroupId ↔ Taishan shard ID one‑to‑one.

Shard count equals Flink maxParallelism and is immutable.

Each keyed operator creates a dedicated Taishan table.

KV keys are prefixed with the operator identifier.

During checkpoint, each shard creates a snapshot with the same snapshot ID.

During restore, each shard is restored using its snapshot ID.

4. Optimizations

Metrics collected from RocksDBStateBackend showed millions of read/write requests per second and varying payload sizes. Moving state to remote storage increased RPC overhead, raising CPU usage. To mitigate this, a cache layer was added between Flink and Taishan, and batch‑write techniques were applied.

4.1 Write Optimization – Batch Write

Put/remove requests are queued in a BlockingQueue per subtask and flushed asynchronously when a threshold is reached or during checkpoint, reducing network write volume by orders of magnitude.

4.2 Read Optimization – Cache‑Accelerated Reads

A Caffeine cache (later replaced by an off‑heap OHC cache) stores recently accessed state, dramatically cutting RPC calls. Read‑null scenarios (sparse keys or periodic key patterns) were initially filtered with Bloom filters, but due to false‑positive growth, a custom OffHeapBloomFilter with TTL‑aware entries was introduced.

4.3 Memory Model Optimization

Original per‑operator Caffeine caches caused excessive object count (N × M) and GC pressure. Switching to an off‑heap OHC cache shared across subtasks reduced GC overhead and improved eviction control, with custom hash table sizing and LRU eviction logic aware of state TTL.

5. Current Status and Future Work

Since November 2022, >100 stateful jobs have been migrated to TaishanStateBackend, achieving storage‑compute separation and faster rescaling, albeit with a slight increase in network‑related resource usage. Remaining challenges include high‑QPS scenarios with sparse keys, large key/value cases causing write stalls, and the need for tiered state storage that combines off‑heap cache, local SSD, and remote KV store.

Future plans involve adopting tiered state‑backend concepts, leveraging idle SSDs in “disk‑less” deployments, and expanding TaishanStateBackend coverage across more Flink jobs.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Performance OptimizationBig DataFlinkState BackendTaishan KVRemote Storage
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.