Databases 15 min read

How Didi Scaled Log Search by Replacing Elasticsearch with ClickHouse

Facing PB‑scale daily logs and costly Elasticsearch bottlenecks, Didi redesigned its log‑search architecture by migrating to ClickHouse, detailing the challenges, storage redesign, cluster upgrades, performance optimizations, stability fixes, and the resulting cost reduction and query speed gains.

dbaplus Community
dbaplus Community
dbaplus Community
How Didi Scaled Log Search by Replacing Elasticsearch with ClickHouse

Background

ClickHouse is an open‑source columnar distributed database designed for real‑time analytics. Didi migrated its petabyte‑scale ride‑hailing and log‑retrieval platforms from Elasticsearch (ES) to ClickHouse in 2020 because ES’s inverted‑index architecture caused write‑throughput bottlenecks, high storage costs, and excessive memory usage.

Challenges

Massive data volume : Daily log generation reaches petabyte scale, requiring a storage system that can ingest data in real time.

Diverse query patterns : Equality, fuzzy, and sorting queries must return results within seconds, often scanning large data ranges.

High QPS : Trace queries must sustain very high query‑per‑second rates despite the data size.

Why ClickHouse

Scalable architecture : Distributed nodes can be added or removed dynamically to accommodate massive datasets.

High write performance : MergeTree tables achieve ~200 MB/s write speed, eliminating the previous write bottleneck.

Efficient query execution : Partition and sorting indexes enable millions of rows per second scans on a single node.

Cost‑effective storage : Columnar compression and HDFS‑based hot‑cold separation reduce storage expenses.

Architecture Upgrade

The legacy pipeline duplicated logs to both ES and HDFS, with Spark processing HDFS data. The new design replaces ES with two isolated ClickHouse clusters:

Log cluster : Stores detailed log records.

Trace cluster : Stores trace data extracted from logs.

Flink writes logs directly to the Log cluster. Materialized views extract trace fields and asynchronously write them to the Trace cluster. A background thread periodically syncs cold data from ClickHouse to HDFS.

Storage Design

To minimize read latency, log timestamps are rounded to the hour (field logTimeHour) and stored in hour‑ordered blocks. Most hour‑range queries therefore need to read at most two index blocks.

Log Table

CREATE TABLE ck_bamai_stream.cn_bmauto_local (
    `logTime` Int64 DEFAULT 0,
    `logTimeHour` DateTime MATERIALIZED toStartOfHour(toDateTime(logTime / 1000)),
    `odinLeaf` String DEFAULT '',
    `uri` LowCardinality(String) DEFAULT '',
    `traceid` String DEFAULT '',
    `cspanid` String DEFAULT '',
    `dltag` String DEFAULT '',
    `spanid` String DEFAULT '',
    `message` String DEFAULT '',
    `otherColumn` Map(String, String),
    `_sys_insert_time` DateTime MATERIALIZED now()
) ENGINE = MergeTree
PARTITION BY toYYYYMMDD(logTimeHour)
ORDER BY (logTimeHour, odinLeaf, uri, traceid)
TTL _sys_insert_time + toIntervalDay(7), _sys_insert_time + toIntervalDay(3) TO VOLUME 'hdfs'
SETTINGS index_granularity = 8192, min_bytes_for_wide_part = 31457280;

Key design points:

Partition key : Daily partitions (by day) keep the part count manageable while supporting hour‑range queries.

Sorting key : logTimeHour first, followed by low‑cardinality columns ( odinLeaf, uri, traceid) to enable millisecond‑level scans.

Map column : Stores dynamic schema fields in a Map(String, String), reducing the number of physical columns and avoiding many small HDFS files.

Trace Table

CREATE TABLE ck_bamai_stream.trace_view (
    `traceid` String,
    `spanid` String,
    `clientHost` String,
    `logTimeHour` DateTime,
    `cspanid` AggregateFunction(groupUniqArray, String),
    `appName` SimpleAggregateFunction(any, String),
    `logTimeMin` SimpleAggregateFunction(min, Int64),
    `logTimeMax` SimpleAggregateFunction(max, Int64),
    `dltag` AggregateFunction(groupUniqArray, String),
    `uri` AggregateFunction(groupUniqArray, String),
    `errno` AggregateFunction(groupUniqArray, String),
    `odinLeaf` SimpleAggregateFunction(any, String),
    `extractLevel` SimpleAggregateFunction(any, String)
) ENGINE = AggregatingMergeTree
PARTITION BY toYYYYMMDD(logTimeHour)
ORDER BY (logTimeHour, traceid, spanid, clientHost)
TTL logTimeHour + toIntervalDay(7)
SETTINGS index_granularity = 1024;

The table uses AggregatingMergeTree to compress trace records (≈5:1 compression) and shares the same partition and sorting keys as the Log table.

Trace Index Table

CREATE TABLE orderid_traceid_index_view (
    `order_id` String,
    `traceid` String,
    `logTimeHour` DateTime
) ENGINE = AggregatingMergeTree
PARTITION BY logTimeHour
ORDER BY (order_id, traceid)
TTL logTimeHour + toIntervalDay(7)
SETTINGS index_granularity = 1024;

This table accelerates lookups of traceid by order_id, driver_id, etc., via materialized view triggers from the Log tables.

Stability Improvements

Fragmentation of small tables : Dynamically allocate 2‑to‑max write nodes per table based on traffic, preventing a large number of tiny parts and HDFS small‑file issues.

Write throttling and performance : Implemented Flink‑side throttling to protect critical tables during traffic spikes; built a native ClickHouse TCP connector with custom serialization, reducing write latency from 20 % to 5 % and increasing throughput by ~1.2×.

HDFS Cold‑Hot Separation Issues

Problems such as slow service restarts, high sys‑cpu usage, and directory‑entry limits were addressed by:

Caching HDFS part metadata locally to speed up node restarts (from ~1 hour to ~1 minute).

Changing the cold‑data path to write locally first, then merge and upload to HDFS, eliminating direct writes to HDFS for historical partitions.

Reorganizing HDFS storage into cluster/shard/database/table/ hierarchies and backing up local path‑mapping files to HDFS, enabling recovery after node failures.

HDFS cold‑hot redesign
HDFS cold‑hot redesign

Results

After migration, the ClickHouse log clusters comprise >400 physical nodes, sustain write peaks >40 GB/s, handle ~15 million daily queries, and reach QPS peaks around 200. Compared with ES, machine cost dropped ~30 % and query latency improved roughly fourfold; most P99 queries complete within one second.

Query latency comparison
Query latency comparison
P99 latency chart
P99 latency chart

Future Work

Remaining challenges include high resource consumption for fuzzy queries. Planned improvements are secondary indexes, ZSTD compression, and compute‑storage separation to further enhance log retrieval performance.

distributed-systemsClickHouselog storageelasticsearch migration
dbaplus Community
Written by

dbaplus Community

Enterprise-level professional community for Database, BigData, and AIOps. Daily original articles, weekly online tech talks, monthly offline salons, and quarterly XCOPS&DAMS conferences—delivered by industry experts.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.