How Ray Data’s LogicalOptimizer Transforms Plans for Faster Execution
This article explains Ray Data’s execution pipeline, detailing the LogicalOptimizer’s architecture, core abstractions, rule‑based optimization process, and both logical and physical rule sets, with concrete code examples and practical illustrations of each optimization technique.
Overall Architecture
Ray Data executes a plan through four stages: logical plan → logical optimization → physical plan → physical optimization. The LogicalOptimizer rewrites the logical DAG using a series of rules without changing semantics, preparing it for downstream planning and physical optimization.
LogicalPlan (DAG of LogicalOperator)
│
▼ LogicalOptimizer.optimize(plan) # apply rules repeatedly until DAG stabilizes
│ └─ repeat apply rules in order until no change
│
Optimized LogicalPlan
│
▼ Planner.plan(...) # logical operators → physical operators
│
PhysicalPlan (DAG of PhysicalOperator)
│
▼ PhysicalOptimizer.optimize(plan)
│
Optimized PhysicalPlan → executionLogical Optimizer Principles and Flow
Core Abstractions
Plan : abstract plan holding the DAG root and context (e.g., DataContext).
LogicalPlan : DAG composed of LogicalOperator nodes.
Rule : optimization rule with apply(plan) → plan and optional dependencies() / dependents() to control ordering.
Optimizer : holds a set of Rule objects and repeatedly applies them until a fixed point is reached.
Ruleset : ordered collection of rules sorted by a dependency graph.
Optimization Process (Optimizer.optimize)
def optimize(self, plan: Plan) -> Plan:
previous_plan = plan
while True:
for rule in self.rules: # applied in Ruleset order
plan = rule.apply(plan)
if plan.dag.dag_str == previous_plan.dag.dag_str:
break # fixed point: DAG string unchanged
previous_plan = plan
return planKey points :
Multiple iterations: each round applies all rules in a fixed order until the DAG string stops changing.
Rule order is derived from LogicalOptimizer.rules via get_logical_ruleset(), which builds a dependency graph and produces a topological order.
Inside a rule, plan.dag._apply_transform(transform) performs a post‑order traversal, applying transform(node) → node and copying nodes when children change, ensuring upstream nodes are already optimized.
DAG Traversal: _apply_transform
All operator classes provide
def _apply_transform(self, transform: Callable[[Operator], Operator]) -> Operator:which:
Recursively processes all input_dependencies.
If any child is rewritten, copies the current node and replaces its inputs.
Applies transform to the (possibly copied) node and returns the result.
This post‑order (child → parent) traversal guarantees each node sees an already‑transformed sub‑graph, enabling push‑down, fusion, and other rule implementations.
Logical Ruleset
_LOGICAL_RULESET = Ruleset([
InheritBatchFormatRule,
LimitPushdownRule,
ProjectionPushdown,
PredicatePushdown,
CombineShuffles,
])InheritBatchFormatRule
Purpose : For operators derived from AbstractAllToAll (e.g., Repartition, RandomShuffle, Sort), inherit the batch_format from the upstream MapBatches to avoid unnecessary format conversion.
Mechanism : Walk upstream from an AbstractAllToAll node via input_dependencies[0]. When a MapBatches with a defined batch_format is found, copy the AllToAll node and set its batch_format accordingly.
Example :
# before optimization
Read → MapBatches(fn, batch_format="pandas") → Sort() → MapBatches(...)
# after optimization
Read → MapBatches(fn, batch_format="pandas") → Sort(batch_format="pandas") → MapBatches(...)LimitPushdownRule
Purpose : Push Limit as close to the data source as possible, merge consecutive limits, and push limits through Union branches.
Pushable operators: those that do not change row count (e.g., Project, MapRows, Union).
Non‑pushable operators: those that change row count or order (e.g., Sort, Shuffle, Aggregate, Read).
Examples :
# merge consecutive limits
Limit(100) → Limit(50) => Limit(50)
# push through Project
Limit(10) → Project(select=["a","b"]) → Read => Project(select=["a","b"]) → Limit(10) → Read
# push through Union
Limit(5) → Union(A, B) => Limit(5) → Union( Limit(5)→A , Limit(5)→B )ProjectionPushdown
Purpose : Merge consecutive Project operators and push projections into data sources that support column pruning.
Merge : analyze upstream and downstream Project expressions, replace downstream column references with upstream ones, and combine into a single Project.
Push‑down : if the downstream Read implements LogicalOperatorSupportsProjectionPushdown, request only needed columns; otherwise keep the Project for expression evaluation.
Examples :
# fuse two Projects
select(["a","b"]) → select(["a"]) => select(["a"]) # keep only one Project
# rename + select fusion
rename({"x":"y"}) → select(["y"]) => Project that reads "x" and outputs column "y"
# push into Read (parquet) when supported
Read(parquet) → select(["col1","col2"]) => Read(parquet, columns=["col1","col2"])PredicatePushdown
Purpose : Combine consecutive Filter operators and push predicates toward the data source using the operator’s declared capabilities.
Combine : Filter(A) → Filter(B) becomes Filter(A & B).
Push‑down : based on LogicalOperatorSupportsPredicatePassThrough variants (PASSTHROUGH, PUSH_INTO_BRANCHES, etc.), move filters downstream, possibly into the storage layer (e.g., Parquet predicate push‑down).
Examples :
# merge Filters
Filter(col("a") > 0) → Filter(col("b") < 10) => Filter((col("a") > 0) & (col("b") < 10))
# push through rename
Filter(col("y") > 0) → rename({"x":"y"}) => rename({"x":"y"}) → Filter(col("x") > 0)
# push into Read when supported
Read(parquet) → Filter(col("date") > "2020-01-01") => Read(parquet, predicate=...)CombineShuffles
Purpose : Collapse consecutive shuffle‑type operators into a single operator to reduce unnecessary partitioning and network traffic. Repartition → Repartition becomes one Repartition preserving downstream settings. StreamingRepartition → StreamingRepartition becomes one StreamingRepartition. Repartition → Aggregate is replaced by Aggregate that handles its own partitioning. Sort → Sort collapses into a single Sort using the downstream key.
Example :
# two repartitions merged
Read → Repartition(20) → Repartition(10) => Read → Repartition(10)
# Repartition + Aggregate merged
Read → Repartition(8) → Aggregate(...) => Read → Aggregate(..., handles partitioning itself)Physical Ruleset (Brief)
_PHYSICAL_RULESET = Ruleset([
InheritTargetMaxBlockSizeRule,
SetReadParallelismRule,
FuseOperators,
ConfigureMapTaskMemoryUsingOutputSize,
])Key physical rules include:
Propagating a target block size upstream so that all operators share a consistent block size.
Setting read parallelism based on cluster resources, target block size, and data‑source capabilities.
Fusing compatible physical operators (e.g., consecutive maps or map‑to‑shuffle chains) to reduce task count.
Estimating map‑task memory from average output size when the user has not specified memory, mitigating OOM risk.
From Logical Plan to Execution (get_execution_plan)
def get_execution_plan(logical_plan: LogicalPlan) -> PhysicalPlan:
optimize_logical, plan, optimize_physical = get_plan_conversion_fns()
optimized_logical_plan = optimize_logical(logical_plan) # 1. logical optimization
logical_plan._dag = optimized_logical_plan.dag # 2. write back DAG
physical_plan = plan(optimized_logical_plan) # 3. logical → physical
return optimize_physical(physical_plan) # 4. physical optimizationThe LogicalOptimizer receives a user‑constructed LogicalPlan and outputs a DAG that has undergone multiple passes of InheritBatchFormat, LimitPushdown, ProjectionPushdown, PredicatePushdown, and CombineShuffles, providing a superior starting point for planning and physical optimization.
Summary
LogicalOptimizer : repeatedly applies logical rules to a LogicalPlan’s DAG until a fixed point is reached.
Ruleset : manages rule dependencies and guarantees correct application order.
_apply_transform : post‑order DAG traversal that allows node replacement, enabling push‑down and fusion.
Logical Rules : batch‑format inheritance, limit/projection/predicate push‑down, and shuffle combination reduce data volume and redundancy at the logical level.
Physical Rules : block‑size propagation, read parallelism tuning, operator fusion, and map‑task memory configuration improve execution efficiency.
Big Data Technology Tribe
Focused on computer science and cutting‑edge tech, we distill complex knowledge into clear, actionable insights. We track tech evolution, share industry trends and deep analysis, helping you keep learning, boost your technical edge, and ride the digital wave forward.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
