Migrating Didi's Log Retrieval from Elasticsearch to ClickHouse: Architecture, Challenges, and Performance Optimizations
Didi replaced its Elasticsearch‑based log platform with ClickHouse, redesigning architecture into isolated Log and Trace clusters, using hourly‑partitioned MergeTree tables and aggregating views to handle petabyte‑scale writes, diverse low‑latency queries, and high QPS, achieving over 400 nodes, 40 GB/s throughput, 30 % cost savings and four‑fold query latency reduction.
ClickHouse is an open‑source, high‑performance columnar distributed database released in 2016. It supports vectorized execution, multi‑core parallelism and high compression, making single‑table analytical queries extremely fast. Since 2020 Didi has been migrating its core log‑retrieval platform from Elasticsearch (ES) to ClickHouse (CK) to serve ride‑hailing and other services.
Background
Previously Didi stored logs in ES. ES’s tokenization, inverted and forward indexes caused a clear write‑throughput bottleneck, high storage cost (original text + inverted + forward indexes) and heavy memory usage. As data volume grew, ES could no longer meet performance requirements.
To reduce cost and improve efficiency Didi evaluated alternatives and chose CK, following industry examples from JD, Ctrip, Bilibili, etc.
Challenges
The migration faced three main challenges:
Massive data volume : PB‑level logs per day require a storage system that can sustain real‑time writes and storage at that scale.
Diverse query scenarios : Equality, fuzzy, and sorting queries over large data sets must return results within seconds.
High QPS : Trace queries must handle high QPS even with PB‑scale data.
Why ClickHouse
Large‑scale data : Distributed architecture with dynamic scaling.
Write performance : MergeTree tables can ingest ~200 MB/s per node.
Query performance : Partition and sorting indexes enable scanning millions of rows per second on a single node.
Storage cost : Columnar storage and high compression, combined with HDFS cold‑hot separation, lower storage expenses.
Architecture Upgrade
The legacy architecture duplicated writes to both ES and HDFS; ES handled real‑time queries while Spark processed HDFS data. This required maintaining two independent write paths.
In the new architecture CK replaces ES. Two isolated clusters are created: a Log cluster for detailed logs and a Trace cluster for trace data. The clusters are physically separated to avoid interference between heavy log queries and high‑QPS trace queries. Log data are written directly from Flink to the Log cluster; trace records are materialized via a view and asynchronously written to the Trace cluster. A single write path simplifies operations.
Storage Design
To maximize retrieval speed, logTime is rounded to the hour (logTimeHour) and data are stored in hourly order. This enables reading only one or two index blocks for a one‑hour query.
Log Table Schema
CREATE TABLE ck_bamai_stream.cn_bmauto_local (
`logTime` Int64 DEFAULT 0,
`logTimeHour` DateTime MATERIALIZED toStartOfHour(toDateTime(logTime / 1000)),
`odinLeaf` String DEFAULT '',
`uri` LowCardinality(String) DEFAULT '',
`traceid` String DEFAULT '',
`cspanid` String DEFAULT '',
`dltag` String DEFAULT '',
`spanid` String DEFAULT '',
`message` String DEFAULT '',
`otherColumn` Map
,
`_sys_insert_time` DateTime MATERIALIZED now()
) ENGINE = MergeTree
PARTITION BY toYYYYMMDD(logTimeHour)
ORDER BY (logTimeHour, odinLeaf, uri, traceid)
TTL _sys_insert_time + toIntervalDay(7), _sys_insert_time + toIntervalDay(3) TO VOLUME 'hdfs'
SETTINGS index_granularity = 8192, min_bytes_for_wide_part = 31457280;Key design points:
Partition key : Daily partition (toYYYYMMDD) to avoid excessive part count.
Sorting key : logTimeHour first, followed by low‑cardinality columns (odinLeaf, uri, traceid) to limit index scans.
Map column : Dynamic schema via a Map column reduces the number of physical columns and the amount of small HDFS files.
Trace Table Schema
CREATE TABLE ck_bamai_stream.trace_view (
`traceid` String,
`spanid` String,
`clientHost` String,
`logTimeHour` DateTime,
`cspanid` AggregateFunction(groupUniqArray, String),
`appName` SimpleAggregateFunction(any, String),
`logTimeMin` SimpleAggregateFunction(min, Int64),
`logTimeMax` SimpleAggregateFunction(max, Int64),
`dltag` AggregateFunction(groupUniqArray, String),
`uri` AggregateFunction(groupUniqArray, String),
`errno` AggregateFunction(groupUniqArray, String),
`odinLeaf` SimpleAggregateFunction(any, String),
`extractLevel` SimpleAggregateFunction(any, String)
) ENGINE = AggregatingMergeTree
PARTITION BY toYYYYMMDD(logTimeHour)
ORDER BY (logTimeHour, traceid, spanid, clientHost)
TTL logTimeHour + toIntervalDay(7)
SETTINGS index_granularity = 1024;The table uses AggregatingMergeTree to compress trace records (≈5:1) and provides fast trace‑id lookups.
Trace Index Table Schema
CREATE TABLE orderid_traceid_index_view (
`order_id` String,
`traceid` String,
`logTimeHour` DateTime
) ENGINE = AggregatingMergeTree
PARTITION BY logTimeHour
ORDER BY (order_id, traceid)
TTL logTimeHour + toIntervalDay(7)
SETTINGS index_granularity = 1024;This table accelerates queries that map order_id, driver_id, etc., to traceid via materialized views.
Stability Journey
Supporting the log scenario required handling massive write traffic and a large cluster. After a year of construction Didi can sustain holiday peaks.
Large‑Cluster Small‑Table Fragmentation
90 % of Log tables have <10 MB/s traffic. Writing all tables to hundreds of nodes caused data fragmentation and many small HDFS files. Didi dynamically allocated write nodes per table (2 – max) and let Flink obtain the node list, reducing fragmentation.
Write Throttling and Performance Boost
Flink implements dynamic write‑rate limiting to protect critical tables during traffic spikes. A native TCP connector was built to replace HTTP, reducing network overhead and custom serialization improved efficiency. Latency dropped from 20 % to 5 % and overall write throughput increased by 1.2×.
HDFS Cold‑Hot Separation Issues
Four major problems were observed:
Service restarts were slow because libhdfs3 concurrently loaded millions of Part metadata, causing heavy sys‑cpu usage.
Writing historical partitions directly to HDFS bypassed local merge, leading to poor write performance.
All tables shared a single HDFS directory, hitting the 1 M file limit.
Path‑mapping metadata stored locally could be lost on node failure, causing data loss.
Solutions included caching Part metadata locally (startup reduced from 1 hour to 1 minute), enforcing local write‑then‑merge before HDFS upload, and reorganizing HDFS paths by cluster/shard/database/table with per‑table backup of mapping information.
Benefits
After migration, the CK log cluster exceeds 400 physical nodes, peak write throughput >40 GB/s, daily query volume ≈15 M, peak QPS ≈200. Machine cost dropped ~30 % compared with ES, and query latency improved ~4× (P99 < 1 s).
Conclusion
Migrating logs from ES to CK reduces storage cost and delivers faster queries. After a year of optimization the system is stable and performant, though fuzzy queries still consume significant resources. Future work will explore secondary indexes, ZSTD compression and compute‑storage separation.
Didi Tech
Official Didi technology account
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.