How Ray Data Turns Logical Plans into Executable Workflows – A Deep Dive
This article provides a comprehensive, step‑by‑step explanation of Ray Data's LogicalPlan architecture, covering its class hierarchy, core methods, logical operators, optimization rules, planning from logical to physical operators, execution binding, metadata inference, lineage serialization, and the full file/module index for developers building scalable data pipelines.
Overview
Ray Data uses a LogicalPlan to represent the logical execution plan of a dataset. The plan consists of a DAG of LogicalOperator nodes and a DataContext that carries configuration through optimization and execution.
LogicalPlan Definition
LogicalPlanis a thin wrapper around a LogicalOperator DAG ( dag) and a DataContext. It provides methods such as dag, dag_str, post_order_iter(), _apply_transform(), and sources() for DAG inspection and transformation.
Class Hierarchy
class Plan
├─ LogicalPlan
└─ PhysicalPlan
class Operator
├─ LogicalOperator (adds num_outputs, estimated_num_outputs(), infer_schema(), infer_metadata(), is_lineage_serializable())
└─ PhysicalOperator (execution‑specific implementations)Core Methods
dag: returns the root LogicalOperator (the downstream node). dag_str: recursively builds a string like Read[ReadParquet] -> Filter -> Limit for debugging and optimizer convergence checks. post_order_iter(): depth‑first post‑order traversal yielding child operators before their parent. _apply_transform(transform): recursively applies a transformation function to the DAG, copying nodes only when needed to preserve immutability. sources(): collects all leaf source operators (those with no input_dependencies).
Logical Operators
Key logical operators include: Read – external data source (files, databases). FromItems, FromArrow, FromPandas, FromNumpy – in‑memory sources. InputData – wraps existing RefBundle objects. MapRows, MapBatches, FlatMap, Filter, Project, StreamingRepartition – row or batch transformations. Limit, Count – one‑to‑one operators. Repartition, RandomShuffle, Sort, Aggregate – all‑to‑all (shuffle) operators. Union, Zip, Join – multi‑input operators. Write, StreamingSplit, Download – output operators.
LogicalPlan Construction
Every user‑facing API (e.g., ray.data.read_parquet(), ds.map(), ds.filter()) creates a new LogicalOperator whose input_dependencies point to the current plan's DAG root. A new LogicalPlan is then instantiated with this operator, leaving the original plan unchanged.
LogicalPlan Generation Pipeline
# optimizers.py
def get_execution_plan(logical_plan: LogicalPlan) -> PhysicalPlan:
optimize_logical, plan, optimize_physical = get_plan_conversion_fns()
# 1. Logical optimization
optimized_logical_plan = optimize_logical(logical_plan)
logical_plan._dag = optimized_logical_plan.dag
# 2. Logical → Physical planning
physical_plan = plan(optimized_logical_plan)
# 3. Physical optimization
return optimize_physical(physical_plan)Optimizer Framework
Both LogicalOptimizer and PhysicalOptimizer inherit from Optimizer, which repeatedly applies a list of Rule objects until the DAG string stops changing ( plan.dag.dag_str == previous_plan.dag.dag_str).
Logical Optimizer Rules
InheritBatchFormatRule: propagates batch_format from the nearest MapBatches upstream to all‑to‑all operators. LimitPushdownRule: pushes Limit toward the source, merging consecutive limits and setting per_block_limit on Read when possible. ProjectionPushdown: merges consecutive Project nodes, substitutes column references, and pushes column selection into sources that implement LogicalOperatorSupportsProjectionPushdown. PredicatePushdown: merges consecutive Filter nodes, pushes predicates through operators that declare LogicalOperatorSupportsPredicatePassThrough, and finally pushes them into sources supporting LogicalOperatorSupportsPredicatePushdown. CombineShuffles: removes redundant shuffle operators (e.g., consecutive Repartition).
Planner (Logical → Physical)
The Planner holds a mapping _DEFAULT_PLAN_FNS from logical operator classes to planning functions (e.g., Read → plan_read_op, AbstractUDFMap → plan_udf_map_op, AbstractAllToAll → plan_all_to_all_op, Union → plan_union_op, etc.). It recursively traverses the logical DAG in post‑order, creates the corresponding physical operator, wires output dependencies, and builds an op_map linking physical to logical nodes.
Physical Optimizer Rules
InheritTargetMaxBlockSizeRule: propagates target_max_block_size downstream until another operator overrides it. SetReadParallelismRule: computes the final parallelism for Read based on target block size, file count, and cluster resources. FuseOperators: merges adjacent MapOperator nodes when compute strategies and remote args are compatible, and also fuses MapOperator → AllToAllOperator when possible. ConfigureMapTaskMemoryUsingOutputSize: sets ray_remote_args_fn on map tasks so that memory is allocated based on estimated output block size.
ExecutionPlan Binding and Execution
When a Dataset is created, its ExecutionPlan calls link_logical_plan(logical_plan), storing the plan and sharing the same DataContext. Consumption methods ( take(), count(), write_*) invoke ExecutionPlan.execute(), which calls _get_execution_dag() to run the three‑step conversion and then hands the resulting physical DAG to StreamingExecutor for actual task execution.
Metadata Inference (No Execution)
Methods such as plan.schema(), plan.meta_count(), and plan.initial_num_blocks() directly call the logical DAG's infer_schema(), infer_metadata().num_rows, and estimated_num_outputs() without triggering any computation.
Explain()
ExecutionPlan.explain()prints four sections: original logical plan, optimized logical plan, physical plan, and optimized physical plan, showing transformations like limit pushdown, predicate pushdown, and operator fusion.
Lineage Serialization
Dataset lineage can be serialized if every operator returns True from is_lineage_serializable(). The process deep‑copies the plan, clears any materialized data, and pickles the dataset. Deserialization reconstructs the logical plan on a new cluster, allowing recomputation.
File & Module Index
Key directories: _internal/logical/interfaces/ – base classes ( Operator, Plan, LogicalOperator, mixins for pushdown support). _internal/logical/operators/ – concrete logical operators (read, map, filter, shuffle, join, etc.). _internal/logical/rules/ – optimization rule implementations. _internal/planner/ – planner and per‑operator planning functions. _internal/execution/ – ExecutionPlan, legacy compatibility layer, and StreamingExecutor. dataset.py, read_api.py, grouped_data.py – user‑facing API that builds logical plans.
Summary
The LogicalPlan is the immutable, DAG‑based representation of a Ray Data workflow. It undergoes logical optimizations (limit, projection, predicate pushdown, shuffle merging), is translated to a physical DAG by the planner, and finally receives physical optimizations (operator fusion, parallelism tuning, memory configuration). Execution is lazy until a consumer triggers it, and the entire plan can be visualized, explained, or serialized for cross‑cluster reuse.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology Tribe
Focused on computer science and cutting‑edge tech, we distill complex knowledge into clear, actionable insights. We track tech evolution, share industry trends and deep analysis, helping you keep learning, boost your technical edge, and ride the digital wave forward.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
