Mastering ClickHouse with Flink: Table Engine Choices, Performance Tuning, and Real‑World Lessons
This article details how JDQ+Flink+Elasticsearch was extended with ClickHouse for real‑time reporting, covering table‑engine selection, Flink sink implementation, query optimization techniques, high‑CPU shard analysis, and practical strategies to ensure high concurrency and stable performance in production.
1. Introduction
Jingxi Da's technology team uses a JDQ+Flink+Elasticsearch architecture for real‑time data reports, but Elasticsearch showed limitations such as crashes under high‑frequency deep pagination, inaccurate distinct counts, and degraded aggregation performance. ClickHouse was introduced to address these issues.
2. Problems Encountered
Choosing the appropriate table engine
How Flink writes to ClickHouse
Why ClickHouse queries are 1–2 minutes slower than Elasticsearch
Whether to write to distributed or local tables
High CPU usage on a single shard
Identifying which SQL statements consume CPU
Optimizing slow SQL
Ensuring ClickHouse can handle high concurrency
3. Table Engine Selection and Query Strategies
ClickHouse offers four engine families: Log, MergeTree, Integration, and Special. MergeTree and its variants (ReplacingMergeTree, CollapsingMergeTree, VersionedCollapsingMergeTree, SummingMergeTree, AggregatingMergeTree) are the most commonly used for large‑scale inserts and queries.
For the use case requiring upserts, ReplacingMergeTree is suitable because it can deduplicate rows during background merges or via query‑time options.
CREATE TABLE test_MergeTree (
orderNo String,
number Int16,
createTime DateTime,
updateTime DateTime
) ENGINE = MergeTree()
PARTITION BY createTime
ORDER BY (orderNo)
PRIMARY KEY (orderNo);
INSERT INTO test_MergeTree VALUES('1','20','2021-01-01 00:00:00','2021-01-01 00:00:00');
INSERT INTO test_MergeTree VALUES('1','30','2021-01-01 00:00:00','2021-01-01 01:00:00');Because MergeTree does not deduplicate on primary key, both rows remain after insertion, which is undesirable for order‑level metrics.
CREATE TABLE test_ReplacingMergeTree (
orderNo String,
version Int16,
number Int16,
createTime DateTime,
updateTime DateTime
) ENGINE = ReplacingMergeTree(version)
PARTITION BY createTime
ORDER BY (orderNo)
PRIMARY KEY (orderNo);
INSERT INTO test_ReplacingMergeTree VALUES('1',1,'20','2021-01-01 00:00:00','2021-01-01 00:00:00');
INSERT INTO test_ReplacingMergeTree VALUES('1',2,'30','2021-01-01 00:00:00','2021-01-01 01:00:00');
INSERT INTO test_ReplacingMergeTree VALUES('1',3,'30','2021-01-02 00:00:00','2021-01-01 01:00:00');Deduplication strategies:
If no version column is defined, the last inserted row for a given primary key is kept.
If a version column is defined, the row with the highest version value is kept, regardless of insert order.
Query‑time deduplication can be performed with FINAL or argMax(). FINAL works only on local tables, while argMax() can deduplicate across shards.
4. How Flink Writes to ClickHouse
4.1 Flink version considerations
Before Flink 1.11 the JDBC connector package was flink-jdbc; from 1.11 onward it is flink-connector-jdbc. Only the newer package supports writing a DataStream to ClickHouse, so the project upgraded to Flink ≥ 1.11.
4.2 Constructing the ClickHouse sink
public static SinkFunction getSink(String clusterPrefix, String sql) {
String clusterUrl = LoadPropertiesUtil.appInfoProcessMap.get(clusterPrefix + CLUSTER_URL);
String clusterUsername = LoadPropertiesUtil.appInfoProcessMap.get(clusterPrefix + CLUSTER_USER_NAME);
String clusterPassword = LoadPropertiesUtil.appInfoProcessMap.get(clusterPrefix + CLUSTER_PASSWORD);
return JdbcSink.sink(
sql,
new CkSinkBuilder<>(),
new JdbcExecutionOptions.Builder().withBatchSize(200000).build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
.withUrl(clusterUrl)
.withUsername(clusterUsername)
.withPassword(clusterPassword)
.build());
}The sink parameters are:
sql : prepared‑statement with placeholders, e.g. INSERT INTO demo (id, name) VALUES (?, ?) new CkSinkBuilder<>() : maps stream records to PreparedStatement JdbcExecutionOptions : controls batch size or interval
JdbcConnectionOptions : driver, URL, credentials
5. Flink Writing Strategies
Initially the batch size was set to 200 000 rows, causing data to be flushed only when that threshold or a checkpoint occurred. This made ClickHouse appear slower than Elasticsearch. The strategy was changed to flush every 30 seconds ( withBatchIntervalMs(30000)) to reduce latency while avoiding excessive checkpoint‑driven bursts.
6. Distributed vs. Local Tables
The team chose to write to a distributed table because it required no code changes and performed adequately during the Double‑Eleven peak (30 M rows per day, QPS 93, CPU ≤ 60 %). Distributed tables route rows to shards based on the sharding key; however, all client‑side data first lands on the shard the client connects to, which can create I/O bottlenecks if the sharding rule is uneven.
7. High CPU Usage on a Specific Shard
CPU spikes were traced to data skew: one shard stored far more rows because the original hash key (grid‑station code) had low cardinality. Switching the sharding key to sipHash64(docId) (a unique business identifier) balanced the load.
8. Locating CPU‑Intensive SQL
Two approaches were used:
Grafana dashboards highlighted high‑frequency queries; investigation revealed a front‑end call that triggered many similar SQLs with different status values.
Analyzing system.query_log (or a distributed copy system.query_log_all) to find queries with large read_rows or long execution times, then drilling down with EXPLAIN or trace‑level logs.
9. Slow Query Optimization
9.1 Using service logs for deep analysis
Trace‑level logs from clickhouse-client reveal which parts and marks are read, whether primary or partition indexes are used, and memory consumption. Example log excerpts showed full‑table scans because neither primary nor partition indexes were applied.
9.2 Table design improvements
Avoid Nullable columns when possible, as they require extra files and cannot be indexed.
Choose an appropriate partition granularity (day or month) based on query patterns; too fine creates many tiny partitions, too coarse forces scans of irrelevant data.
Pick a sharding rule that distributes rows evenly (e.g., sipHash64(docId)).
9.3 Indexes and query patterns
Secondary indexes (minmax, set, bloom_filter, etc.) can dramatically reduce scanned data. Example:
ALTER TABLE wms.wms_order_sku_local ON CLUSTER default ADD INDEX belongProvinceCode_idx belongProvinceCode TYPE set(0) GRANULARITY 5;
ALTER TABLE wms.wms_order_sku_local ON CLUSTER default ADD INDEX productionEndTime_idx productionEndTime TYPE minmax GRANULARITY 5;After adding an index, the same query scanned only 38 K rows instead of 9 M.
9.4 Query‑level techniques
Use FINAL or argMax() for deduplication; FINAL is faster on local tables.
Prefer PREWHERE for highly selective, immutable columns to reduce I/O before reading other columns.
Combine multiple aggregations with conditional functions ( sumIf, countIf) to avoid repeated scans.
Ensure WHERE / GROUP BY column order matches the ORDER BY prefix of the table to enable index usage.
9.5 Benchmarking
Use clickhouse-benchmark to compare QPS and latency before and after optimizations.
10. Ensuring High Concurrency and Availability
Key tactics:
Lower max_threads (e.g., to 8 on a 32‑core node) to limit per‑query CPU usage and increase overall throughput.
Cache query results for a short period to smooth burst traffic.
Offload heavy aggregations to Elasticsearch or materialized views when real‑time freshness is not required.
Stress tests showed that reducing max_threads improves overall QPS at the cost of higher tail latency, allowing operators to choose a balance that fits their SLA.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
dbaplus Community
Enterprise-level professional community for Database, BigData, and AIOps. Daily original articles, weekly online tech talks, monthly offline salons, and quarterly XCOPS&DAMS conferences—delivered by industry experts.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
