ClickHouse & Flink: Choosing Engines, Tuning Queries, and Scaling Concurrency
This article details how JDQ, Flink, and ClickHouse were integrated to replace Elasticsearch for real‑time reporting, covering table‑engine selection, Flink sink implementation, performance bottlenecks, CPU hot‑spots, query optimization techniques, and strategies for handling high concurrency while ensuring data consistency and system stability.
1. Introduction
JingxiDa technology department uses a JDQ+Flink+Elasticsearch architecture for real‑time data reports. As business grew, Elasticsearch showed limitations (high‑frequency deep pagination, inaccurate distinct counts, degraded aggregation performance), prompting the introduction of ClickHouse.
2. Problems Encountered
Which table engine to use
How Flink writes to ClickHouse
Why ClickHouse queries are 1‑2 minutes slower than Elasticsearch
Write to distributed vs local tables
Why a specific shard shows high CPU usage
How to locate CPU‑consuming SQL statements
How to optimize identified slow SQL
How to ensure ClickHouse remains available under high concurrency
3. Table Engine Selection and Query方案
Before choosing an engine, the need for upsert support was identified: Flink builds a wide table, and ES upsert can overwrite previous rows, but ClickHouse lacks native upsert. ClickHouse offers four engine families: Log, MergeTree, Integration, Special.
Log: suitable for <1 M rows, no index, poor range queries.
Integration: for importing external data (Kafka, HDFS, JDBC, MySQL).
Special: e.g., Memory (in‑memory, lost on restart) and File (stores local files).
MergeTree family: provides primary‑key index, partitioning, replication, sampling, and supports massive writes.
Our workload (≈25 M rows daily, need primary‑key index) leads to the MergeTree family as the target.
3.1 MergeTree
Supports full ClickHouse SQL. Primary key is not for deduplication; duplicate primary‑key rows coexist.
CREATE TABLE test_MergeTree (
orderNo String,
number Int16,
createTime DateTime,
updateTime DateTime
) ENGINE = MergeTree()
PARTITION BY createTime
ORDER BY (orderNo)
PRIMARY KEY (orderNo);Inserting two rows with the same orderNo creates two parts; after background merge they may be combined, but both rows remain unless a deduplication engine is used.
3.2 ReplacingMergeTree
Provides deduplication during merge. If no version column is defined, the latest row is kept; with a version column, the row with the highest version is retained.
CREATE TABLE test_ReplacingMergeTree (
orderNo String,
version Int16,
number Int16,
createTime DateTime,
updateTime DateTime
) ENGINE = ReplacingMergeTree(version)
PARTITION BY createTime
ORDER BY (orderNo)
PRIMARY KEY (orderNo);Three insert examples illustrate how final or argMax queries return deduplicated results, while physical data may still contain duplicates until a merge occurs.
3.3 CollapsingMergeTree / VersionedCollapsingMergeTree
Uses a sign column (1 for insert, -1 for delete) to collapse rows with the same primary key. VersionedCollapsingMergeTree adds a version column to relax ordering constraints.
3.4 Engine Summary
Summarizes MergeTree, ReplacingMergeTree, CollapsingMergeTree, VersionedCollapsingMergeTree, SummingMergeTree, AggregatingMergeTree. For our need of both detail and aggregated metrics, ReplacingMergeTree with final queries was chosen.
4. Flink Writing to ClickHouse
4.1 Flink JDBC Connector Version
Before Flink 1.11 the connector package is flink-jdbc; from 1.11 onward it is flink-connector-jdbc. The newer version supports DataStream writes to ClickHouse.
4.2 Constructing ClickHouse Sink
public static SinkFunction getSink(String clusterPrefix, String sql) {
String clusterUrl = LoadPropertiesUtil.appInfoProcessMap.get(clusterPrefix + CLUSTER_URL);
String clusterUsername = LoadPropertiesUtil.appInfoProcessMap.get(clusterPrefix + CLUSTER_USER_NAME);
String clusterPassword = LoadPropertiesUtil.appInfoProcessMap.get(clusterPrefix + CLUSTER_PASSWORD);
return JdbcSink.sink(sql, new CkSinkBuilder<>(),
new JdbcExecutionOptions.Builder().withBatchSize(200000).build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
.withUrl(clusterUrl)
.withUsername(clusterUsername)
.withPassword(clusterPassword)
.build());
}The sink uses a batch size of 200 k rows; Flink checkpoints trigger the actual write.
4.3 Write Strategy Optimization
Original strategy wrote every 200 k rows or on checkpoint, causing latency up to 1‑2 minutes for low‑volume streams. Changing to a time‑based batch interval (e.g., 30 s) reduces latency while avoiding excessive Zookeeper load.
4.4 Distributed vs Local Tables
Writing to a distributed table routes rows to shards based on a hash (e.g., sipHash64(docId)). Although simpler, it can create I/O bottlenecks on the client node. For our 30 M‑row daily volume, distributed writes performed adequately; local tables could be considered if bottlenecks appear.
5. CPU Hot‑Spot Analysis
One shard (node 7‑1) showed high CPU due to data skew: the hash of a small‑range field caused many rows to land on the same shard. Switching to a business‑key hash resolved the imbalance.
Another node (7‑4) exhibited high CPU because a large partition (202111_0_408188_322) was not merged, leading to full‑table scans. Adding a secondary index on gridStationCode reduced scanned rows from 73 M to 1.4 M and cut query time.
6. Locating CPU‑Consuming SQL
Two approaches:
Grafana monitoring of query frequency to spot hot queries.
Analyzing system.query_log (or a distributed copy) to find queries with high read_rows or long query_duration_ms.
7. Slow‑Query Optimization
7.1 Using Service Logs for Trace‑Level Analysis
Enabling --send_logs_level=trace in clickhouse-client reveals index usage, part scans, and aggregation steps. Example logs show a query scanning 73 M rows (full table) versus a version with a partition filter scanning only 1.4 M rows.
7.2 Table Design Recommendations
Avoid Nullable columns when possible; they prevent indexing and add storage overhead.
Choose partition granularity that matches query patterns (day or month, not minute).
Select shard keys that distribute data evenly (e.g., sipHash64(docId)).
7.3 Query‑Level Optimizations
Use final or argMax for deduplication; final is generally faster.
Prefer prewhere for immutable filter columns to reduce I/O.
Apply conditional aggregation ( sumIf, countIf) to combine multiple metrics into a single scan.
Create secondary indexes (minmax, set, bloom_filter) on frequently filtered columns.
Ensure WHERE / PREWHERE column order matches the ORDER BY prefix of the table to enable index pruning.
Limit column selection; avoid SELECT * to reduce data read.
7.4 Example of Conditional Aggregation
SELECT
sumIf(qty, type = 'inbound') AS inbound_qty,
countIf(distinct orderNo, type = 'outbound' AND status = '1') AS outbound_orders,
sumIf(qty, type = 'check') AS check_qty
FROM table_1
PREWHERE dt = '2021-01-01';8. Ensuring High Concurrency and Availability
Reduce max_threads per query (e.g., from CPU core count to a lower value) to limit per‑query CPU consumption.
Introduce short‑term caching for hot aggregation results.
Offload aggregated metrics to Elasticsearch for fast reads.
Materialized views can pre‑aggregate data, though not suitable for our real‑time use case.
9. Conclusion
By migrating from Elasticsearch to ClickHouse, selecting ReplacingMergeTree with final queries, optimizing Flink sink batch settings, adding appropriate secondary indexes, and tuning query patterns, the system handles 30 M‑row daily increments with stable CPU usage (~60 % on a 32C/128G cluster) even during peak traffic.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
JD Cloud Developers
JD Cloud Developers (Developer of JD Technology) is a JD Technology Group platform offering technical sharing and communication for AI, cloud computing, IoT and related developers. It publishes JD product technical information, industry content, and tech event news. Embrace technology and partner with developers to envision the future.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
