Big Data 14 min read

How Ray Data’s LogicalOptimizer Transforms Plans for Faster Execution

This article explains Ray Data’s execution pipeline, detailing the LogicalOptimizer’s architecture, core abstractions, rule‑based optimization process, and both logical and physical rule sets, with concrete code examples and practical illustrations of each optimization technique.

Big Data Technology Tribe
Big Data Technology Tribe
Big Data Technology Tribe
How Ray Data’s LogicalOptimizer Transforms Plans for Faster Execution

Overall Architecture

Ray Data executes a plan through four stages: logical plan → logical optimization → physical plan → physical optimization. The LogicalOptimizer rewrites the logical DAG using a series of rules without changing semantics, preparing it for downstream planning and physical optimization.

LogicalPlan (DAG of LogicalOperator)
    │
    ▼ LogicalOptimizer.optimize(plan)  # apply rules repeatedly until DAG stabilizes
    │   └─ repeat apply rules in order until no change
    │
Optimized LogicalPlan
    │
    ▼ Planner.plan(...)  # logical operators → physical operators
    │
PhysicalPlan (DAG of PhysicalOperator)
    │
    ▼ PhysicalOptimizer.optimize(plan)
    │
Optimized PhysicalPlan → execution

Logical Optimizer Principles and Flow

Core Abstractions

Plan : abstract plan holding the DAG root and context (e.g., DataContext).

LogicalPlan : DAG composed of LogicalOperator nodes.

Rule : optimization rule with apply(plan) → plan and optional dependencies() / dependents() to control ordering.

Optimizer : holds a set of Rule objects and repeatedly applies them until a fixed point is reached.

Ruleset : ordered collection of rules sorted by a dependency graph.

Optimization Process (Optimizer.optimize)

def optimize(self, plan: Plan) -> Plan:
    previous_plan = plan
    while True:
        for rule in self.rules:  # applied in Ruleset order
            plan = rule.apply(plan)
        if plan.dag.dag_str == previous_plan.dag.dag_str:
            break  # fixed point: DAG string unchanged
        previous_plan = plan
    return plan

Key points :

Multiple iterations: each round applies all rules in a fixed order until the DAG string stops changing.

Rule order is derived from LogicalOptimizer.rules via get_logical_ruleset(), which builds a dependency graph and produces a topological order.

Inside a rule, plan.dag._apply_transform(transform) performs a post‑order traversal, applying transform(node) → node and copying nodes when children change, ensuring upstream nodes are already optimized.

DAG Traversal: _apply_transform

All operator classes provide

def _apply_transform(self, transform: Callable[[Operator], Operator]) -> Operator:

which:

Recursively processes all input_dependencies.

If any child is rewritten, copies the current node and replaces its inputs.

Applies transform to the (possibly copied) node and returns the result.

This post‑order (child → parent) traversal guarantees each node sees an already‑transformed sub‑graph, enabling push‑down, fusion, and other rule implementations.

Logical Ruleset

_LOGICAL_RULESET = Ruleset([
    InheritBatchFormatRule,
    LimitPushdownRule,
    ProjectionPushdown,
    PredicatePushdown,
    CombineShuffles,
])

InheritBatchFormatRule

Purpose : For operators derived from AbstractAllToAll (e.g., Repartition, RandomShuffle, Sort), inherit the batch_format from the upstream MapBatches to avoid unnecessary format conversion.

Mechanism : Walk upstream from an AbstractAllToAll node via input_dependencies[0]. When a MapBatches with a defined batch_format is found, copy the AllToAll node and set its batch_format accordingly.

Example :

# before optimization
Read → MapBatches(fn, batch_format="pandas") → Sort() → MapBatches(...)

# after optimization
Read → MapBatches(fn, batch_format="pandas") → Sort(batch_format="pandas") → MapBatches(...)

LimitPushdownRule

Purpose : Push Limit as close to the data source as possible, merge consecutive limits, and push limits through Union branches.

Pushable operators: those that do not change row count (e.g., Project, MapRows, Union).

Non‑pushable operators: those that change row count or order (e.g., Sort, Shuffle, Aggregate, Read).

Examples :

# merge consecutive limits
Limit(100) → Limit(50)  =>  Limit(50)

# push through Project
Limit(10) → Project(select=["a","b"]) → Read  =>  Project(select=["a","b"]) → Limit(10) → Read

# push through Union
Limit(5) → Union(A, B)  =>  Limit(5) → Union( Limit(5)→A , Limit(5)→B )

ProjectionPushdown

Purpose : Merge consecutive Project operators and push projections into data sources that support column pruning.

Merge : analyze upstream and downstream Project expressions, replace downstream column references with upstream ones, and combine into a single Project.

Push‑down : if the downstream Read implements LogicalOperatorSupportsProjectionPushdown, request only needed columns; otherwise keep the Project for expression evaluation.

Examples :

# fuse two Projects
select(["a","b"]) → select(["a"])  =>  select(["a"])   # keep only one Project

# rename + select fusion
rename({"x":"y"}) → select(["y"])  =>  Project that reads "x" and outputs column "y"

# push into Read (parquet) when supported
Read(parquet) → select(["col1","col2"])  =>  Read(parquet, columns=["col1","col2"])

PredicatePushdown

Purpose : Combine consecutive Filter operators and push predicates toward the data source using the operator’s declared capabilities.

Combine : Filter(A) → Filter(B) becomes Filter(A & B).

Push‑down : based on LogicalOperatorSupportsPredicatePassThrough variants (PASSTHROUGH, PUSH_INTO_BRANCHES, etc.), move filters downstream, possibly into the storage layer (e.g., Parquet predicate push‑down).

Examples :

# merge Filters
Filter(col("a") > 0) → Filter(col("b") < 10)  =>  Filter((col("a") > 0) & (col("b") < 10))

# push through rename
Filter(col("y") > 0) → rename({"x":"y"})  =>  rename({"x":"y"}) → Filter(col("x") > 0)

# push into Read when supported
Read(parquet) → Filter(col("date") > "2020-01-01")  =>  Read(parquet, predicate=...)

CombineShuffles

Purpose : Collapse consecutive shuffle‑type operators into a single operator to reduce unnecessary partitioning and network traffic. Repartition → Repartition becomes one Repartition preserving downstream settings. StreamingRepartition → StreamingRepartition becomes one StreamingRepartition. Repartition → Aggregate is replaced by Aggregate that handles its own partitioning. Sort → Sort collapses into a single Sort using the downstream key.

Example :

# two repartitions merged
Read → Repartition(20) → Repartition(10)  =>  Read → Repartition(10)

# Repartition + Aggregate merged
Read → Repartition(8) → Aggregate(...)  =>  Read → Aggregate(..., handles partitioning itself)

Physical Ruleset (Brief)

_PHYSICAL_RULESET = Ruleset([
    InheritTargetMaxBlockSizeRule,
    SetReadParallelismRule,
    FuseOperators,
    ConfigureMapTaskMemoryUsingOutputSize,
])

Key physical rules include:

Propagating a target block size upstream so that all operators share a consistent block size.

Setting read parallelism based on cluster resources, target block size, and data‑source capabilities.

Fusing compatible physical operators (e.g., consecutive maps or map‑to‑shuffle chains) to reduce task count.

Estimating map‑task memory from average output size when the user has not specified memory, mitigating OOM risk.

From Logical Plan to Execution (get_execution_plan)

def get_execution_plan(logical_plan: LogicalPlan) -> PhysicalPlan:
    optimize_logical, plan, optimize_physical = get_plan_conversion_fns()
    optimized_logical_plan = optimize_logical(logical_plan)  # 1. logical optimization
    logical_plan._dag = optimized_logical_plan.dag          # 2. write back DAG
    physical_plan = plan(optimized_logical_plan)           # 3. logical → physical
    return optimize_physical(physical_plan)                # 4. physical optimization

The LogicalOptimizer receives a user‑constructed LogicalPlan and outputs a DAG that has undergone multiple passes of InheritBatchFormat, LimitPushdown, ProjectionPushdown, PredicatePushdown, and CombineShuffles, providing a superior starting point for planning and physical optimization.

Summary

LogicalOptimizer : repeatedly applies logical rules to a LogicalPlan’s DAG until a fixed point is reached.

Ruleset : manages rule dependencies and guarantees correct application order.

_apply_transform : post‑order DAG traversal that allows node replacement, enabling push‑down and fusion.

Logical Rules : batch‑format inheritance, limit/projection/predicate push‑down, and shuffle combination reduce data volume and redundancy at the logical level.

Physical Rules : block‑size propagation, read parallelism tuning, operator fusion, and map‑task memory configuration improve execution efficiency.

big dataQuery OptimizationDistributed ComputingRay DataLogical Optimizer
Big Data Technology Tribe
Written by

Big Data Technology Tribe

Focused on computer science and cutting‑edge tech, we distill complex knowledge into clear, actionable insights. We track tech evolution, share industry trends and deep analysis, helping you keep learning, boost your technical edge, and ride the digital wave forward.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.