Big Data 29 min read

Advancements and Optimizations of FlinkSQL at Bilibili

Bilibili’s FlinkSQL team has enhanced the Flink engine—now based on 1.11 with back‑ported 1.15 features—by adding Delay‑Join, table‑valued functions, projection‑push‑down, UDF and object reuse, automatic mini‑batch/two‑phase aggregation, key‑group skew fixes, connector slot‑groups, real‑time projection with Hudi, and RocksDB state‑performance tweaks, while planning remote state backends and deeper stream‑batch integration.

Bilibili Tech
Bilibili Tech
Bilibili Tech
Advancements and Optimizations of FlinkSQL at Bilibili

The Bilibili real‑time platform maintains a FlinkSQL team responsible for developing the SQL capabilities of the Flink engine, supporting use cases such as data integration, real‑time data warehousing, model training, feature calculation, and incremental processing. Over 4,000 Flink jobs run in production, with more than 90% expressed as SQL, which reduces operational complexity and support costs.

The team builds on Flink 1.11 as the main version, focusing on stability, functionality, and performance, while also incorporating features from the newer 1.15 release to improve batch processing.

1. Delay‑Join

To improve join success rates when a dimension table (also a Flink job) experiences latency, a Delay‑Join capability was added. It allows delayed re‑joining with configurable back‑off retries and optional output suppression. The implementation stores pending join keys in KeyedStateBackend (requiring a preceding KeyBy) and uses Flink timers, avoiding OOM risks of in‑memory queues.

Configuration parameters for Delay‑Join are provided at the job level (see the original configuration image).

2. Table‑valued functions (TVF)

Traditional group windows lack mini‑batch support, two‑phase aggregation, and CUMULATE windows. TVF syntax introduced in Flink 1.13 resolves these issues, improving performance by about 30% after back‑porting to 1.11. The TVF implementation required changes to the ManagedMemory model, which were also back‑ported.

3. Projection‑PushDown

Many metric calculations involve wide tables where only a few columns are needed. Projection‑PushDown pushes column selection down to the source, reducing serialization overhead. The team added support for Kafka sources (which are row‑based) and fixed a bug where watermarks prevented push‑down. Example SQL before and after push‑down:

## Simplified SQL
CREATE TABLE source (
  `a` VARCHAR,
  `b` BIGINT,
  `c` VARCHAR,
  `d` BIGINT,
  `e` VARCHAR,
  `f` VARCHAR,
  `ts` AS TO_TIMESTAMP(f)
) WITH (
  'connector'='bsql-kafka', ...
);

CREATE TABLE STD_LOG (
  `a` VARCHAR,
  `b` BIGINT,
  `f` VARCHAR,
  `ts` Timestamp(3)
) WITH (
  'connector'='bsql-log',
  'infinite'='true'
);

INSERT INTO STD_LOG SELECT a,b,f,ts FROM source;

## Optimized Logical Plan without Projection‑PushDown
Sink(table=[...STD_LOG], fields=[a, b, f, ts])
+- Calc(select=[a, b, f, TO_TIMESTAMP(f) AS ts])
   +- TableSourceScan(table=[[...source]], fields=[a, b, c, d, e, f])

## Optimized Logical Plan with Projection‑PushDown
Sink(table=[...STD_LOG], fields=[a, b, f, ts])
+- Calc(select=[a, b, f, TO_TIMESTAMP(f) AS ts])
   +- TableSourceScan(table=[[...source, project=[a, b, f]]], fields=[a, b, f])

4. UDF Reuse

Scalar UDFs were previously evaluated twice (once in WHERE, once in SELECT). By recording UDF results in generated code and reusing them when deterministic, the team eliminated redundant computation. The feature can be toggled with the parameter table.optimizer.reuse-user-defined-function.enabled . Code generation changes are illustrated in the original presentation images.

5. Object Reuse

For high‑throughput jobs, serialization between chained operators dominates cost. Enabling Flink’s object‑reuse flag allows operators to pass references directly, provided the data is deep‑copied before the next use. The team identified safe points (asynchronous join, async sink) and enabled object‑reuse globally, achieving multi‑fold performance gains on large records.

6. Automatic Mini‑Batch / Two‑Phase Aggregation

The compiler now analyses the DAG to decide whether to enable mini‑batch and two‑phase aggregation, mitigating group‑by skew while avoiding watermark propagation issues.

7. Key‑Group Skew Optimization

Imbalanced KeyGroup distribution caused sub‑tasks to process up to twice the data of others. By adjusting the maximum parallelism calculation to be more conservative, the skew was reduced to under 10% without harming checkpoint performance.

8. Connector Slot‑Group

To address resource imbalance caused by Kafka connector serialization, a Slot‑Group configuration isolates connector tasks into dedicated slots, preventing them from contending with compute‑heavy operators.

9. Real‑time Projection (Stream‑Batch Integration)

Inspired by ClickHouse’s projection, the team built a real‑time projection framework on top of Flink + Hudi. It materializes frequently queried batch results in a streaming fashion, enabling sub‑second query responses. The architecture consists of three modules: Kyuubi (SQL entry), flink‑local‑executor (client that creates projections), and projection‑manager (metadata service). Users can hint queries for projection materialization:

SELECT /*+ OPTIONS('table.optimizer.materialization-enabled'='true') */
  window_end, f1, sum_f2
FROM (
  SELECT /*+ OPTIONS('table.optimizer.materialization.sub-query'='true') */
    CAST(window_end AS VARCHAR) AS window_end,
    f1,
    SUM(f2) AS sum_f2
  FROM TABLE(
    HOP(
      TABLE bili.dwd_data_example,
      DESCRIPTOR(ts),
      INTERVAL '5' MINUTES,
      INTERVAL '60' MINUTES
    )
  )
  GROUP BY window_end, f1
) c
WHERE f1 = 1 AND window_end = '2022-08-01 12:00:00.000';

The hint does not guarantee rewrite; fallback to original query occurs when watermark lag is excessive.

10. Flink State Performance Foundations

The team enhanced RocksDB state monitoring by adding fine‑grained metrics for reads (get, seek, next) and writes (put, merge, delete), and introduced bloom filter optimizations to reduce null‑read costs.

Compression settings were tuned: partitioned indexes and LZ4 compression replace default Snappy, yielding ~20% CPU reduction for large states (up to 10 TB).

Disk allocation was improved by scoring YARN nodes based on I/O load and selecting low‑load disks for state files, achieving more balanced I/O across machines.

Additional RocksDB parameter tweaks (partitioned index/filters, compression algorithm) further improved cache hit rates and reduced I/O pressure.

11. Ongoing Work

• Remote StateBackend: moving KeyedStateBackend to an external service to enable compute‑storage separation and reduce checkpoint latency.

• Further stream‑batch integration: extending real‑time projection to reporting, feature extraction, and other use cases.

The presentation concludes with an invitation for feedback and a reference link to a related article.

performance optimizationstream processingState BackendFlinkSQLReal-time ProjectionSQL Extensions
Bilibili Tech
Written by

Bilibili Tech

Provides introductions and tutorials on Bilibili-related technologies.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.