Databases 22 min read

How ByteHouse Transforms ClickHouse for Complex Queries: Multi‑Stage Execution and Real‑World Optimizations

This article explains how ByteHouse, a heavily optimized fork of ClickHouse, introduces a multi‑stage execution model, advanced exchange mechanisms, and runtime filters to overcome the limitations of the original two‑stage query flow, delivering significant performance gains for complex joins, aggregations, and large‑scale analytics workloads.

ITPUB
ITPUB
ITPUB
How ByteHouse Transforms ClickHouse for Complex Queries: Multi‑Stage Execution and Real‑World Optimizations

Project Background

ClickHouse has become a mainstream open‑source analytical engine, but as data volumes and query complexity grow, its traditional two‑stage execution model can cause bottlenecks, especially for heavy aggregations, large joins, and nested sub‑queries.

ByteHouse, developed by engineers at ByteDance, is a deep‑optimised version of ClickHouse that adds capabilities to efficiently handle complex queries across massive clusters.

Technical Solution

Design Philosophy

The solution replaces ClickHouse’s two‑stage execution with a multi‑stage model similar to Presto or Impala. Queries are split into independent Stages that exchange data via an ExchangeManager , eliminating intra‑stage data shuffling.

Key Concepts

ExchangeNode : Represents a data‑exchange point in the query plan.

PlanSegment : The executable fragment for a single Stage.

ExchangeManager : Manages push‑based data transfer and back‑pressure between Stages.

SegmentScheduler : Dispatches PlanSegments to workers based on dependencies and resource availability.

InterpreterPlanSegment : Executes a serialized PlanSegment on a worker node.

Execution Flow

Coordinator receives a complex query and inserts ExchangeNode s into the syntax tree.

Each ExchangeNode defines a Stage; the planner generates a PlanSegment for each Stage.

SegmentScheduler sends the PlanSegments to appropriate workers.

Workers run InterpreterPlanSegment, reading inputs (local tables or exchange inputs), executing the plan, and pushing results to downstream ExchangeManagers.

Coordinator aggregates final results from the last Stage and returns them to the client.

Plan Splitting Example

A two‑table join is divided into four Stages as illustrated below:

Segment Scheduling Strategies

Dependency‑Based Scheduling : Builds a DAG of Stage dependencies and launches Stages only after their predecessors finish.

All‑At‑Once : Computes all Stage metadata upfront and dispatches them simultaneously, reducing latency at the cost of lower fault tolerance.

Data Exchange Optimizations

ExchangeManager uses push‑based transfer with fine‑grained memory control, connection reuse, and optional compression or RDMA to minimise network overhead and avoid OOM situations.

Optimization and Diagnosis

Join Implementations

Shuffle Join – general purpose.

Broadcast Join – replicates the small right table to all workers.

Colocate Join – leverages data co‑location to avoid shuffling.

Network Connection Optimizations

By reusing a fixed pool of connections across queries, the system prevents the explosion of connections that occurs in large clusters with many parallel Stages.

Runtime Filters

Runtime filters (min‑max or bloom filters) are built on the right‑hand side of a join and pushed to the left‑hand side to prune irrelevant rows before the join, dramatically reducing data movement for star‑schema queries.

Workflow:

Workers on the right table generate a runtime filter and send it to the coordinator.

The coordinator merges filters from all workers and distributes the merged filter to left‑table workers.

Left‑table workers apply the filter during execution, potentially before the full plan runs.

Filters are only beneficial when the right table is relatively small and the filter has high selectivity; otherwise they may add overhead.

Diagnostics and Metrics

Extensive metrics (query latency, per‑Stage timing, I/O volume, operator profiles) and back‑pressure signals are collected to pinpoint bottlenecks. Recording queue lengths and back‑pressure events helps infer Stage health.

Results and Outlook

Performance Gains

Benchmarks on a 1 TB SSB dataset (8‑node cluster) show:

Complex second‑stage aggregations reduced from 8.514 s to 2.198 s via parallel shuffle aggregation.

Hash Join with a large right table improved from 17.210 s to 1.749 s using sub‑query push‑down and shuffle.

Five‑table join execution time dropped from 8.583 s to 4.464 s, with all right tables read in parallel.

Further gains are expected when combining these optimizations with advanced cost‑based optimizer (CBO) rules such as predicate push‑down, join order selection, and join implementation choice.

Future Directions

Continue enhancing Stage execution and Exchange performance (e.g., better indexing, operator improvements).

Expand metrics and intelligent diagnostics to lower the on‑call burden and provide automated tuning suggestions.

Explore deeper integration of CBO and runtime filter strategies for even more complex analytical workloads.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Performance OptimizationClickHouseDatabase EngineeringDistributed QueryByteHouseMulti‑Stage Execution
ITPUB
Written by

ITPUB

Official ITPUB account sharing technical insights, community news, and exciting events.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.