Databases 46 min read

Mastering ClickHouse with Flink: Table Engine Choices, Performance Tuning, and Real‑World Lessons

This article details how JDQ+Flink+Elasticsearch was extended with ClickHouse for real‑time reporting, covering table‑engine selection, Flink sink implementation, query optimization techniques, high‑CPU shard analysis, and practical strategies to ensure high concurrency and stable performance in production.

dbaplus Community
dbaplus Community
dbaplus Community
Mastering ClickHouse with Flink: Table Engine Choices, Performance Tuning, and Real‑World Lessons

1. Introduction

Jingxi Da's technology team uses a JDQ+Flink+Elasticsearch architecture for real‑time data reports, but Elasticsearch showed limitations such as crashes under high‑frequency deep pagination, inaccurate distinct counts, and degraded aggregation performance. ClickHouse was introduced to address these issues.

2. Problems Encountered

Choosing the appropriate table engine

How Flink writes to ClickHouse

Why ClickHouse queries are 1–2 minutes slower than Elasticsearch

Whether to write to distributed or local tables

High CPU usage on a single shard

Identifying which SQL statements consume CPU

Optimizing slow SQL

Ensuring ClickHouse can handle high concurrency

3. Table Engine Selection and Query Strategies

ClickHouse offers four engine families: Log, MergeTree, Integration, and Special. MergeTree and its variants (ReplacingMergeTree, CollapsingMergeTree, VersionedCollapsingMergeTree, SummingMergeTree, AggregatingMergeTree) are the most commonly used for large‑scale inserts and queries.

For the use case requiring upserts, ReplacingMergeTree is suitable because it can deduplicate rows during background merges or via query‑time options.

CREATE TABLE test_MergeTree (
  orderNo String,
  number Int16,
  createTime DateTime,
  updateTime DateTime
) ENGINE = MergeTree()
PARTITION BY createTime
ORDER BY (orderNo)
PRIMARY KEY (orderNo);

INSERT INTO test_MergeTree VALUES('1','20','2021-01-01 00:00:00','2021-01-01 00:00:00');
INSERT INTO test_MergeTree VALUES('1','30','2021-01-01 00:00:00','2021-01-01 01:00:00');

Because MergeTree does not deduplicate on primary key, both rows remain after insertion, which is undesirable for order‑level metrics.

CREATE TABLE test_ReplacingMergeTree (
  orderNo String,
  version Int16,
  number Int16,
  createTime DateTime,
  updateTime DateTime
) ENGINE = ReplacingMergeTree(version)
PARTITION BY createTime
ORDER BY (orderNo)
PRIMARY KEY (orderNo);

INSERT INTO test_ReplacingMergeTree VALUES('1',1,'20','2021-01-01 00:00:00','2021-01-01 00:00:00');
INSERT INTO test_ReplacingMergeTree VALUES('1',2,'30','2021-01-01 00:00:00','2021-01-01 01:00:00');
INSERT INTO test_ReplacingMergeTree VALUES('1',3,'30','2021-01-02 00:00:00','2021-01-01 01:00:00');

Deduplication strategies:

If no version column is defined, the last inserted row for a given primary key is kept.

If a version column is defined, the row with the highest version value is kept, regardless of insert order.

Query‑time deduplication can be performed with FINAL or argMax(). FINAL works only on local tables, while argMax() can deduplicate across shards.

4. How Flink Writes to ClickHouse

4.1 Flink version considerations

Before Flink 1.11 the JDBC connector package was flink-jdbc; from 1.11 onward it is flink-connector-jdbc. Only the newer package supports writing a DataStream to ClickHouse, so the project upgraded to Flink ≥ 1.11.

4.2 Constructing the ClickHouse sink

public static SinkFunction getSink(String clusterPrefix, String sql) {
    String clusterUrl = LoadPropertiesUtil.appInfoProcessMap.get(clusterPrefix + CLUSTER_URL);
    String clusterUsername = LoadPropertiesUtil.appInfoProcessMap.get(clusterPrefix + CLUSTER_USER_NAME);
    String clusterPassword = LoadPropertiesUtil.appInfoProcessMap.get(clusterPrefix + CLUSTER_PASSWORD);
    return JdbcSink.sink(
        sql,
        new CkSinkBuilder<>(),
        new JdbcExecutionOptions.Builder().withBatchSize(200000).build(),
        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
            .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
            .withUrl(clusterUrl)
            .withUsername(clusterUsername)
            .withPassword(clusterPassword)
            .build());
}

The sink parameters are:

sql : prepared‑statement with placeholders, e.g. INSERT INTO demo (id, name) VALUES (?, ?) new CkSinkBuilder<>() : maps stream records to PreparedStatement JdbcExecutionOptions : controls batch size or interval

JdbcConnectionOptions : driver, URL, credentials

5. Flink Writing Strategies

Initially the batch size was set to 200 000 rows, causing data to be flushed only when that threshold or a checkpoint occurred. This made ClickHouse appear slower than Elasticsearch. The strategy was changed to flush every 30 seconds ( withBatchIntervalMs(30000)) to reduce latency while avoiding excessive checkpoint‑driven bursts.

6. Distributed vs. Local Tables

The team chose to write to a distributed table because it required no code changes and performed adequately during the Double‑Eleven peak (30 M rows per day, QPS 93, CPU ≤ 60 %). Distributed tables route rows to shards based on the sharding key; however, all client‑side data first lands on the shard the client connects to, which can create I/O bottlenecks if the sharding rule is uneven.

7. High CPU Usage on a Specific Shard

CPU spikes were traced to data skew: one shard stored far more rows because the original hash key (grid‑station code) had low cardinality. Switching the sharding key to sipHash64(docId) (a unique business identifier) balanced the load.

8. Locating CPU‑Intensive SQL

Two approaches were used:

Grafana dashboards highlighted high‑frequency queries; investigation revealed a front‑end call that triggered many similar SQLs with different status values.

Analyzing system.query_log (or a distributed copy system.query_log_all) to find queries with large read_rows or long execution times, then drilling down with EXPLAIN or trace‑level logs.

9. Slow Query Optimization

9.1 Using service logs for deep analysis

Trace‑level logs from clickhouse-client reveal which parts and marks are read, whether primary or partition indexes are used, and memory consumption. Example log excerpts showed full‑table scans because neither primary nor partition indexes were applied.

9.2 Table design improvements

Avoid Nullable columns when possible, as they require extra files and cannot be indexed.

Choose an appropriate partition granularity (day or month) based on query patterns; too fine creates many tiny partitions, too coarse forces scans of irrelevant data.

Pick a sharding rule that distributes rows evenly (e.g., sipHash64(docId)).

9.3 Indexes and query patterns

Secondary indexes (minmax, set, bloom_filter, etc.) can dramatically reduce scanned data. Example:

ALTER TABLE wms.wms_order_sku_local ON CLUSTER default ADD INDEX belongProvinceCode_idx belongProvinceCode TYPE set(0) GRANULARITY 5;
ALTER TABLE wms.wms_order_sku_local ON CLUSTER default ADD INDEX productionEndTime_idx productionEndTime TYPE minmax GRANULARITY 5;

After adding an index, the same query scanned only 38 K rows instead of 9 M.

9.4 Query‑level techniques

Use FINAL or argMax() for deduplication; FINAL is faster on local tables.

Prefer PREWHERE for highly selective, immutable columns to reduce I/O before reading other columns.

Combine multiple aggregations with conditional functions ( sumIf, countIf) to avoid repeated scans.

Ensure WHERE / GROUP BY column order matches the ORDER BY prefix of the table to enable index usage.

9.5 Benchmarking

Use clickhouse-benchmark to compare QPS and latency before and after optimizations.

10. Ensuring High Concurrency and Availability

Key tactics:

Lower max_threads (e.g., to 8 on a 32‑core node) to limit per‑query CPU usage and increase overall throughput.

Cache query results for a short period to smooth burst traffic.

Offload heavy aggregations to Elasticsearch or materialized views when real‑time freshness is not required.

Stress tests showed that reducing max_threads improves overall QPS at the cost of higher tail latency, allowing operators to choose a balance that fits their SLA.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

FlinkPerformanceOptimizationClickHouseMergeTreeReplacingMergeTreeDistributedTablesSQLTuning
dbaplus Community
Written by

dbaplus Community

Enterprise-level professional community for Database, BigData, and AIOps. Daily original articles, weekly online tech talks, monthly offline salons, and quarterly XCOPS&DAMS conferences—delivered by industry experts.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.