How Didi Built a Next‑Gen Log Storage System with ClickHouse
Didi migrated its massive PB‑scale log data from Elasticsearch to ClickHouse, redesigning storage with separate Log and Trace clusters, optimizing partition and sorting keys, introducing native TCP connectors, and revamping HDFS cold‑hot separation, achieving up to four‑fold query speed gains and 30% lower hardware costs.
ClickHouse, an open‑source high‑performance columnar database launched in 2016, offers vectorized execution, multi‑core parallelism, and high compression, making it the fastest for single‑table analytical queries. Since 2020 Didi has deployed ClickHouse at scale for ride‑hailing and log retrieval, replacing Elasticsearch (ES) as the core log store.
Background
ES’s tokenization, inverted and forward indexes caused a clear write‑throughput bottleneck and high storage costs because it stored raw text, inverted and forward indexes, demanding substantial memory. Growing data volumes made ES insufficient.
Challenges
Petabyte‑level daily log volume requiring stable real‑time ingestion.
Diverse query scenarios (exact, fuzzy, sorting) needing second‑level response on large scans.
High QPS for trace queries on PB‑scale data.
Why ClickHouse
Distributed architecture supports dynamic scaling for massive data.
MergeTree tables write at ~200 MB/s, eliminating write bottlenecks.
Partition and sorting indexes enable millions of rows per second scans.
Columnar storage with high compression and HDFS‑based hot‑cold separation reduces storage cost.
Architecture Upgrade
The previous architecture duplicated logs to ES and HDFS, with ES handling real‑time queries and Spark analyzing HDFS data, doubling resource consumption and operational complexity.
The new design lets ClickHouse replace ES, creating isolated Log and Trace clusters. The Log cluster stores detailed logs; the Trace cluster stores trace data, preventing heavy log queries from affecting high‑QPS trace queries. Log data is written by Flink directly to the Log cluster; a materialized view extracts trace records into the Trace cluster, and background threads sync cold data to HDFS.
Storage Design
Log Table
Each service has its own Log table (thousands in total), written by Flink from Pulsar. The largest tables generate PB‑level data per day. To handle many tables, large data volume, and hot‑cold separation, the following schema is used:
CREATE TABLE ck_bamai_stream.cn_bmauto_local (
`logTime` Int64 DEFAULT 0,
`logTimeHour` DateTime MATERIALIZED toStartOfHour(toDateTime(logTime / 1000)),
`odinLeaf` String DEFAULT '',
`uri` LowCardinality(String) DEFAULT '',
`traceid` String DEFAULT '',
`cspanid` String DEFAULT '',
`dltag` String DEFAULT '',
`spanid` String DEFAULT '',
`message` String DEFAULT '',
`otherColumn` Map<String,String>,
`_sys_insert_time` DateTime MATERIALIZED now()
) ENGINE = MergeTree
PARTITION BY toYYYYMMDD(logTimeHour)
ORDER BY (logTimeHour, odinLeaf, uri, traceid)
TTL _sys_insert_time + toIntervalDay(7), _sys_insert_time + toIntervalDay(3) TO VOLUME 'hdfs'
SETTINGS index_granularity = 8192, min_bytes_for_wide_part = 31457280;Partition key: daily partitions (toYYYYMMDD) because most queries target a single hour; hourly partitions would create too many parts and small HDFS files.
Sorting key: `logTimeHour` first (hour‑level range lock), followed by low‑cardinality columns `odinLeaf`, `uri`, `traceid` to enable millisecond‑level exact queries.
Map column: stores rarely‑filtered fields, reducing part count and avoiding many small HDFS files.
Trace Table
The Trace table serves high‑QPS trace queries and resides in the Trace cluster. Data is materialized from Log tables via cross‑cluster triggers.
CREATE TABLE ck_bamai_stream.trace_view (
`traceid` String,
`spanid` String,
`clientHost` String,
`logTimeHour` DateTime,
`cspanid` AggregateFunction(groupUniqArray, String),
`appName` SimpleAggregateFunction(any, String),
`logTimeMin` SimpleAggregateFunction(min, Int64),
`logTimeMax` SimpleAggregateFunction(max, Int64),
`dltag` AggregateFunction(groupUniqArray, String),
`uri` AggregateFunction(groupUniqArray, String),
`errno` AggregateFunction(groupUniqArray, String),
`odinLeaf` SimpleAggregateFunction(any, String),
`extractLevel` SimpleAggregateFunction(any, String)
) ENGINE = AggregatingMergeTree
PARTITION BY toYYYYMMDD(logTimeHour)
ORDER BY (logTimeHour, traceid, spanid, clientHost)
TTL logTimeHour + toIntervalDay(7)
SETTINGS index_granularity = 1024;AggregatingMergeTree aggregates rows by `traceid`, achieving ~5:1 compression and faster retrieval.
Partition and sorting keys mirror the Log design for consistency.
Reduced index granularity (1024) minimizes scanning of irrelevant blocks.
Trace Index Table
Accelerates lookups of `order_id`, `driver_id`, `driver_phone` etc. by materializing an aggregated view.
CREATE TABLE orderid_traceid_index_view (
`order_id` String,
`traceid` String,
`logTimeHour` DateTime
) ENGINE = AggregatingMergeTree
PARTITION BY logTimeHour
ORDER BY (order_id, traceid)
TTL logTimeHour + toIntervalDay(7)
SETTINGS index_granularity = 1024;Stability Journey
Large‑Cluster Small‑Table Fragmentation
90% of Log tables have < 10 MB/s traffic. Writing all tables across hundreds of nodes caused massive small‑table fragmentation, hurting query performance and creating many tiny HDFS files.
Solution: Dynamically allocate write nodes per table (2 to max nodes) and let Flink route data to the assigned nodes, mitigating data dispersion.
Write Throttling and Performance Boost
During peak hours, Flink implements dynamic write throttling per table, protecting critical tables from overload.
A native TCP connector was built, replacing HTTP to reduce network overhead. Custom serialization outperforms the previous Parquet format. After enabling the native connector, write‑latency‑rate dropped from 20% to 5% and overall write throughput increased by 1.2×.
HDFS Cold‑Hot Separation Issues
Problems encountered:
Service restarts were slow and CPU‑bound due to libhdfs3’s poor concurrent metadata reads.
Direct writes to HDFS for historical partitions suffered terrible performance and required local merges.
All tables shared a single HDFS directory, hitting the 1 million‑file limit.
Node failures could lose local path‑to‑HDFS mappings, causing data loss.
Remediation: Cache HDFS part metadata locally, reducing restart time from 1 hour to 1 minute; enforce local write‑then‑merge before uploading to HDFS; restructure HDFS paths as cluster/shard/database/table/ and back up path mappings per table, enabling recovery after node loss.
Benefits
The migration is complete: the ClickHouse log cluster now exceeds 400 physical nodes, sustains >40 GB/s write peaks, handles ~15 million daily queries, and peaks at ~200 QPS. Compared with ES, hardware cost dropped by 30% and query latency improved roughly four‑fold (P99 < 1 s for both log and trace clusters).
Conclusion
Migrating logs from ES to ClickHouse dramatically cuts storage cost and delivers faster queries. After a year of construction and tuning, stability and performance have markedly improved, though fuzzy queries still consume considerable resources. Future work includes exploring secondary indexes, ZSTD compression, and compute‑storage separation to further boost log retrieval.
Linux Code Review Hub
A professional Linux technology community and learning platform covering the kernel, memory management, process management, file system and I/O, performance tuning, device drivers, virtualization, and cloud computing.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
