How ByteHouse Transforms ClickHouse for Complex Queries: Multi‑Stage Execution and Real‑World Optimizations
This article explains how ByteHouse, a heavily optimized fork of ClickHouse, introduces a multi‑stage execution model, advanced exchange mechanisms, and runtime filters to overcome the limitations of the original two‑stage query flow, delivering significant performance gains for complex joins, aggregations, and large‑scale analytics workloads.
Project Background
ClickHouse has become a mainstream open‑source analytical engine, but as data volumes and query complexity grow, its traditional two‑stage execution model can cause bottlenecks, especially for heavy aggregations, large joins, and nested sub‑queries.
ByteHouse, developed by engineers at ByteDance, is a deep‑optimised version of ClickHouse that adds capabilities to efficiently handle complex queries across massive clusters.
Technical Solution
Design Philosophy
The solution replaces ClickHouse’s two‑stage execution with a multi‑stage model similar to Presto or Impala. Queries are split into independent Stages that exchange data via an ExchangeManager , eliminating intra‑stage data shuffling.
Key Concepts
ExchangeNode : Represents a data‑exchange point in the query plan.
PlanSegment : The executable fragment for a single Stage.
ExchangeManager : Manages push‑based data transfer and back‑pressure between Stages.
SegmentScheduler : Dispatches PlanSegments to workers based on dependencies and resource availability.
InterpreterPlanSegment : Executes a serialized PlanSegment on a worker node.
Execution Flow
Coordinator receives a complex query and inserts ExchangeNode s into the syntax tree.
Each ExchangeNode defines a Stage; the planner generates a PlanSegment for each Stage.
SegmentScheduler sends the PlanSegments to appropriate workers.
Workers run InterpreterPlanSegment, reading inputs (local tables or exchange inputs), executing the plan, and pushing results to downstream ExchangeManagers.
Coordinator aggregates final results from the last Stage and returns them to the client.
Plan Splitting Example
A two‑table join is divided into four Stages as illustrated below:
Segment Scheduling Strategies
Dependency‑Based Scheduling : Builds a DAG of Stage dependencies and launches Stages only after their predecessors finish.
All‑At‑Once : Computes all Stage metadata upfront and dispatches them simultaneously, reducing latency at the cost of lower fault tolerance.
Data Exchange Optimizations
ExchangeManager uses push‑based transfer with fine‑grained memory control, connection reuse, and optional compression or RDMA to minimise network overhead and avoid OOM situations.
Optimization and Diagnosis
Join Implementations
Shuffle Join – general purpose.
Broadcast Join – replicates the small right table to all workers.
Colocate Join – leverages data co‑location to avoid shuffling.
Network Connection Optimizations
By reusing a fixed pool of connections across queries, the system prevents the explosion of connections that occurs in large clusters with many parallel Stages.
Runtime Filters
Runtime filters (min‑max or bloom filters) are built on the right‑hand side of a join and pushed to the left‑hand side to prune irrelevant rows before the join, dramatically reducing data movement for star‑schema queries.
Workflow:
Workers on the right table generate a runtime filter and send it to the coordinator.
The coordinator merges filters from all workers and distributes the merged filter to left‑table workers.
Left‑table workers apply the filter during execution, potentially before the full plan runs.
Filters are only beneficial when the right table is relatively small and the filter has high selectivity; otherwise they may add overhead.
Diagnostics and Metrics
Extensive metrics (query latency, per‑Stage timing, I/O volume, operator profiles) and back‑pressure signals are collected to pinpoint bottlenecks. Recording queue lengths and back‑pressure events helps infer Stage health.
Results and Outlook
Performance Gains
Benchmarks on a 1 TB SSB dataset (8‑node cluster) show:
Complex second‑stage aggregations reduced from 8.514 s to 2.198 s via parallel shuffle aggregation.
Hash Join with a large right table improved from 17.210 s to 1.749 s using sub‑query push‑down and shuffle.
Five‑table join execution time dropped from 8.583 s to 4.464 s, with all right tables read in parallel.
Further gains are expected when combining these optimizations with advanced cost‑based optimizer (CBO) rules such as predicate push‑down, join order selection, and join implementation choice.
Future Directions
Continue enhancing Stage execution and Exchange performance (e.g., better indexing, operator improvements).
Expand metrics and intelligent diagnostics to lower the on‑call burden and provide automated tuning suggestions.
Explore deeper integration of CBO and runtime filter strategies for even more complex analytical workloads.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
ITPUB
Official ITPUB account sharing technical insights, community news, and exciting events.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
