How ClickHouse Local Join Cuts Query Time and Memory Usage in Supply‑Chain Planning
This article explains how moving aggregation logic from in‑memory processing to ClickHouse SQL, synchronizing configuration data, and leveraging ClickHouse ReplacingMergeTree tables with local joins dramatically reduces query latency and memory consumption for large‑scale supply‑chain planning workloads.
Introduction
This article discusses bottlenecks encountered in a supply‑chain planning system and shares tools and methods used to resolve them, focusing on practical solutions rather than deep theoretical details.
Business Background
The system stores planning, dimensional, and configuration data in TiDB, while large historical reference data resides in ClickHouse. As business grew, requirements for filtering historical data and updating business tags caused long T+1 activation cycles, excessive memory usage, and query latencies exceeding 10 seconds.
Solution
Experiments showed that performing aggregation directly in SQL on either TiDB or ClickHouse is far faster than in‑memory aggregation, reducing execution time from about 5 seconds to roughly 300 ms. The main optimization is to sync business configuration data to ClickHouse and join results from TiDB and ClickHouse.
1. ClickHouse ReplacingMergeTree Table and Dimension‑Table Mode
After reviewing the official documentation, we chose ReplacingMergeTree with the
FINALkeyword for deduplication. The following DDL creates a replicated table suitable for production use:
<code>CREATE TABLE IF NOT EXISTS library.blacklist ON CLUSTER xx (
`dept_id_1` Int32 COMMENT '一级部门ID',
`dept_id_2` Int32 COMMENT '二级部门ID',
`dept_id_3` Int32 COMMENT '三级部门ID',
`saler` String COMMENT '销售erp',
`pur_controller` String COMMENT '采控erp',
`update_time` DateTime COMMENT '更新时间',
`is_deleted` UInt8 COMMENT '有效标识 0:未删除 1:已删除'
) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/xx/jdob_ha/sop_pre_mix/blacklist/{shard_dict}', '{replica_dict}', update_time)
ORDER BY (dept_id_3, saler, pur_controller)
SETTINGS storage_policy = 'jdob_ha';</code>Key points:
Use
ReplicatedReplacingMergeTreeinstead of plain
ReplacingMergeTree.
The path parameters are JD‑specific metadata keys and can be left as shown.
update_timeis used in
ORDER BYto keep the latest row.
This table follows the dimension‑table pattern, storing a full copy on each node.
2. ClickHouse Local Join
Local join executes the join on each distributed node before sending results to the aggregating node, offering far lower latency and resource consumption than the default Global Join.
Requirements for Data Sharding
Both tables must share the same sharding function;
rand()cannot be used. A consistent hash algorithm is recommended.
<code>-- Distributed table example
CREATE TABLE IF NOT EXISTS sop_pre_mix.history ON CLUSTER xx AS sop_pre_mix.history_local ENGINE = Distributed('xx','sop_pre_mix','history_local',rand());</code>Local Join Syntax Pitfalls
The correct pattern is:
<code>SELECT *
FROM a.dis JOIN (SELECT * FROM b.local WHERE b.cond2) ON a.key = b.key
WHERE a.cond1;</code>Filters on the distributed left table must appear in the outer
WHEREclause; wrapping the left side in a subquery breaks the local join.
In older ClickHouse versions, the dimension table must be referenced with its database prefix (e.g., sop_pre_mix.sop_sale_plan_rule_core_dim ) or the engine will report “table not found”.
Final Local Join Presentation
Left table (distributed, ~2.9 billion rows):
<code>CREATE TABLE IF NOT EXISTS sop_prod_mix.sop_sale_history_week_local ON CLUSTER xx (
dept_id_1 Int32 COMMENT '一级部门id',
dept_name_1 String COMMENT '一级部门名称',
...
sale_amount_lunar_sp Decimal(20,2) COMMENT '同期自营销售出库金额',
dt String COMMENT '数据日期'
) ENGINE = ReplicatedMergeTree('/clickhouse/LFRH_CK_Pub_115/jdob_ha/sop_prod_mix/sop_sale_history_week_local/{shard}', '{replica}')
PARTITION BY dt
ORDER BY (dept_id_1,dept_id_2,dept_id_3,saler,pur_controller,cate_id_3,ym,ymw)
SETTINGS storage_policy = 'jdob_ha', index_granularity = 8192;
CREATE TABLE IF NOT EXISTS sop_prod_mix.sop_sale_history_week ON CLUSTER xx AS sop_prod_mix.sop_sale_history_week_local ENGINE = Distributed('xx','sop_prod_mix','sop_sale_history_week_local',rand());</code>Right table (dimension, ~4,500 rows):
<code>CREATE TABLE IF NOT EXISTS sop_pre_mix.sop_dim_blacklist ON CLUSTER xx (
`dept_id_1` Int32 COMMENT '一级部门ID',
`dept_id_2` Int32 COMMENT '二级部门ID',
`dept_id_3` Int32 COMMENT '三级部门ID',
`saler` String COMMENT '销售erp',
`pur_controller` String COMMENT '采控erp',
`update_time` DateTime COMMENT '更新时间',
`is_deleted` UInt8 COMMENT '有效标识 0:未删除 1:已删除'
) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/xx/jdob_ha/sop_pre_mix/sop_dim_blacklist/{shard_dict}', '{replica_dict}', update_time)
ORDER BY (dept_id_3, saler, pur_controller)
SETTINGS storage_policy = 'jdob_ha';</code>Sample query that leverages the local join:
<code>SELECT a.ymw AS ymw,
a.dept_id_2 AS dept_id_2,
a.dept_id_3 AS dept_id_3,
a.week AS week,
a.cold_type AS cold_type,
a.year AS YEAR,
a.net_type AS net_type,
a.saler AS saler,
a.pur_controller AS pur_controller,
a.dept_id_1 AS dept_id_1,
a.month AS MONTH,
a.ym AS ym,
CASE WHEN a.dept_id_1 != c.dept_id_1 OR c.dept_id_1 IS NULL THEN -100 ELSE c.core_dim_id END AS core_dim_id,
CASE WHEN a.dept_id_1 != c.dept_id_1 OR c.dept_id_1 IS NULL THEN -100 ELSE c.core_dim_id END AS brand_id,
SUM(initial_inv_amount) AS initial_inv_amount,
SUM(gmv_lunar_sp) AS gmvLunarSp
FROM sop_pur_history_week a
LEFT JOIN (
SELECT dept_id_2, dept_id_3, pur_controller, saler
FROM sop_pre_mix.sop_dim_blacklist FINAL
WHERE is_deleted = 0 AND dept_id_3 IN (12345,23456,...)
) b ON a.dept_id_3 = b.dept_id_3 AND a.pur_controller = b.pur_controller AND a.saler = b.saler
LEFT JOIN (
SELECT dept_id_1, dept_id_2, dept_id_3, pur_controller, saler, core_dim_id
FROM sop_pre_mix.sop_sale_plan_rule_core_dim FINAL
WHERE is_deleted = 0 AND dept_id_3 IN (12345,23456,...) AND core_dim_id IN (12310) AND plan_dim = 'brand'
) c ON a.dept_id_1 = c.dept_id_1 AND a.dept_id_2 = c.dept_id_2 AND a.dept_id_3 = c.dept_id_3 AND a.saler = c.saler AND a.pur_controller = c.pur_controller AND a.brand_id = c.core_dim_id
WHERE dt = '2023-12-16'
AND a.dept_id_3 IN (12345,23456,...)
AND a.brand_id IN (12310)
AND (a.dept_id_3 != b.dept_id_3 OR b.dept_id_3 IS NULL)
GROUP BY a.ymw, a.dept_id_2, a.dept_id_3, a.week, a.cold_type, a.year, a.net_type, a.saler, a.pur_controller, a.dept_id_1, a.month, a.ym, c.dept_id_1, core_dim_id;</code>Resource comparison shows that the local‑join version consumes far fewer rows and memory on a 9‑shard, 18‑node cluster, saving more than tenfold resources as the shard count increases.
Final Optimization Effects
The changes eliminated frequent OOM incidents during regular queries and delivered stable performance improvements. Test‑environment benchmarks (shown in the following chart) confirm the gains, and production results are even better.
JD Cloud Developers
JD Cloud Developers (Developer of JD Technology) is a JD Technology Group platform offering technical sharing and communication for AI, cloud computing, IoT and related developers. It publishes JD product technical information, industry content, and tech event news. Embrace technology and partner with developers to envision the future.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.