How Ray Data Streams Data: From Logical Plans to Distributed Execution
This deep‑dive explains how Ray Data transforms user‑level Dataset APIs into a logical plan, optimizes it, converts it into a physical streaming execution graph, and runs it on a cluster using task and actor pools, detailing each component from read sources to write sinks with code examples.
Introduction
Ray Data is a distributed data‑processing library built on top of Ray. It provides high‑performance, scalable APIs that can be combined with AI workloads such as batch inference and data preprocessing. The library uses a streaming execution model to process datasets that are larger than the memory of a single machine, allowing CPU and GPU resources to be used concurrently.
Example Pipeline
import ray
from typing import Dict
from numpy.typing import NDArray
# Load a CSV dataset directly from NAS/S3
ds = ray.data.read_csv("/mnt/example-data/iris.csv")
# User‑defined map class for per‑row inference
class CustomMapActor:
def __init__(self):
print("init")
def __call__(self, single):
single["output"] = "test"
return single
# User‑defined map‑batch class for batch inference
class CustomMapBatchActor:
def __init__(self):
print("init")
def __call__(self, data: Dict[str, NDArray]):
return data
ds = ds.map(CustomMapActor, concurrency=5)
ds = ds.map_batches(CustomMapBatchActor, concurrency=5, batch_size=1024)
ds.write_csv("/mnt/result-data/")The code demonstrates a classic Ray Data workflow: read a CSV, apply a per‑row map, apply a batch map, and finally write the result to CSV. Thanks to streaming execution, the pipeline can handle inputs far larger than the available memory.
Core Concepts
Datasets and Blocks
A Dataset represents a distributed collection of data. Internally it is composed of blocks , each holding a contiguous subset of rows (default 1000 rows). Blocks are stored as pyarrow.Table objects; if a row schema is too complex for Arrow, Ray Data falls back to pandas DataFrames.
Blocks reside in Ray’s distributed object store (Object Store) and are referenced by ObjectRef s.
Operators and Plans
Ray Data builds a directed acyclic graph (DAG) of LogicalOperators (user‑visible operations such as read_csv, map, write_csv). The Planner converts this logical DAG into a PhysicalPlan composed of PhysicalOperators . The conversion involves three steps:
Logical optimization (e.g., merging consecutive map operators).
Planning: mapping each logical operator to a concrete physical operator.
Physical optimization (e.g., fusing operators).
PhysicalOperator API
A PhysicalOperator transforms one or more input streams of RefBundle s into a single output stream. It provides the following core methods: add_input(refs, input_index) – receives upstream data. has_next() – indicates whether output is ready. get_next() – returns the next RefBundle.
Subclasses implement the internal logic via _add_input_inner and _get_next_inner. This design enables both bulk and streaming execution.
Logical → Physical Conversion
plan_read_op
def plan_read_op(op: Read, physical_children: List[PhysicalOperator], data_context: DataContext) -> PhysicalOperator:
assert len(physical_children) == 0
def get_input_data(target_max_block_size) -> List[RefBundle]:
parallelism = op.get_detected_parallelism()
read_tasks = op._datasource_or_legacy_reader.get_read_tasks(parallelism)
ret = []
for read_task in read_tasks:
read_task_ref = ray.put(read_task)
ref_bundle = RefBundle([(read_task_ref, cleaned_metadata(read_task, read_task_ref))], owns_blocks=False)
ret.append(ref_bundle)
return ret
inputs = InputDataBuffer(data_context, input_data_factory=get_input_data)
def do_read(blocks: Iterable[ReadTask], _: TaskContext) -> Iterable[Block]:
for read_task in blocks:
yield from read_task()
transform_fns = [BlockMapTransformFn(do_read), BuildOutputBlocksMapTransformFn.for_blocks()]
map_transformer = MapTransformer(transform_fns)
return MapOperator.create(
map_transformer,
inputs,
data_context,
name=op.name,
target_max_block_size=None,
compute_strategy=TaskPoolStrategy(op._concurrency),
ray_remote_args=op._ray_remote_args,
)The function creates an InputDataBuffer that yields ReadTask references, wraps the actual read logic in a BlockMapTransformFn, and finally builds a MapOperator (either a task‑pool or actor‑pool implementation depending on the compute strategy).
InputDataBuffer
class InputDataBuffer(PhysicalOperator):
def __init__(self, data_context: DataContext, input_data: Optional[List[RefBundle]] = None,
input_data_factory: Optional[Callable[[int], List[RefBundle]]] = None):
super().__init__("Input", [], data_context, target_max_block_size=None)
if input_data is not None:
self._input_data = input_data[:]
self._is_input_initialized = True
self._initialize_metadata()
else:
assert input_data_factory is not None
self._input_data_factory = input_data_factory
self._is_input_initialized = False
self._input_data_index = 0
def start(self, options: ExecutionOptions) -> None:
if not self._is_input_initialized:
self._input_data = self._input_data_factory(self.actual_target_max_block_size)
self._is_input_initialized = True
self._initialize_metadata()
for bundle in self._input_data:
self._metrics.on_input_received(bundle)
super().start(options)
def has_next(self) -> bool:
return self._input_data_index < len(self._input_data)
def _get_next_inner(self) -> RefBundle:
bundle = self._input_data[self._input_data_index]
self._input_data_index += 1
return bundle InputDataBufferis always the first operator in a DAG; it never accepts upstream inputs.
MapOperator.create
class MapOperator(OneToOneOperator, ABC):
@classmethod
def create(cls, map_transformer: MapTransformer, input_op: PhysicalOperator,
data_context: DataContext, name: str = "Map",
compute_strategy: Optional[ComputeStrategy] = None, **kwargs) -> "MapOperator":
if compute_strategy is None:
compute_strategy = TaskPoolStrategy()
if isinstance(compute_strategy, TaskPoolStrategy):
from ray.data._internal.execution.operators.task_pool_map_operator import TaskPoolMapOperator
return TaskPoolMapOperator(
map_transformer,
input_op,
data_context,
name=name,
**kwargs,
)
elif isinstance(compute_strategy, ActorPoolStrategy):
from ray.data._internal.execution.operators.actor_pool_map_operator import ActorPoolMapOperator
return ActorPoolMapOperator(
map_transformer,
input_op,
data_context,
name=name,
**kwargs,
)
else:
raise ValueError(f"Unsupported execution strategy {compute_strategy}")The factory selects either a TaskPoolMapOperator (Ray tasks) or an ActorPoolMapOperator (Ray actors) based on the provided compute strategy.
ActorPoolMapOperator
class ActorPoolMapOperator(MapOperator):
def __init__(self, map_transformer: MapTransformer, input_op: PhysicalOperator,
data_context: DataContext, compute_strategy: ActorPoolStrategy,
name: str = "ActorPoolMap", **kwargs):
super().__init__(map_transformer, input_op, data_context, name=name, **kwargs)
self._actor_pool = _ActorPool(
compute_strategy,
self._start_actor,
per_actor_resource_usage,
self.data_context._enable_actor_pool_on_exit_hook,
)
self._bundle_queue = create_bundle_queue()
self._cls = None
self._inputs_done = False
def _start_actor(self):
assert self._cls is not None
ctx = self.data_context
if self._ray_remote_args_fn:
self._refresh_actor_cls()
actor = self._cls.options(_labels={self._OPERATOR_ID_LABEL_KEY: self.id}).remote(
ctx, src_fn_name=self.name, map_transformer=self._map_transformer)
res_ref = actor.get_location.options(name=f"{self.name}.get_location").remote()
def _task_done_callback(res_ref):
has_actor = self._actor_pool.pending_to_running(res_ref)
if not has_actor:
return
self._dispatch_tasks()
self._submit_metadata_task(res_ref, lambda: _task_done_callback(res_ref))
return actor, res_ref
def start(self, options: ExecutionOptions):
self._actor_locality_enabled = options.actor_locality_enabled
super().start(options)
self._cls = ray.remote(**self._ray_remote_args)(_MapWorker)
self._actor_pool.scale_up(self._actor_pool.min_size())
refs = self._actor_pool.get_pending_actor_refs()
try:
timeout = self.data_context.wait_for_min_actors_s
ray.get(refs, timeout=timeout)
except ray.exceptions.GetTimeoutError as e:
raise ray.exceptions.GetTimeoutError(
"Timed out while starting actors. This may mean that the cluster does not have enough resources for the requested actor pool.")The operator maintains an actor pool, a queue of pending input bundles, and dispatches work to actors as resources become available.
_MapWorker (actor implementation)
class _MapWorker:
def __init__(self, ctx: DataContext, src_fn_name: str, map_transformer: MapTransformer):
DataContext._set_current(ctx)
self.src_fn_name = src_fn_name
self._map_transformer = map_transformer
self._map_transformer.init()
def get_location(self) -> str:
return ray.get_runtime_context().get_node_id()
def submit(self, data_context: DataContext, ctx: TaskContext, *blocks: Block, **kwargs) -> Iterator[Union[Block, List[BlockMetadata]]]:
yield from _map_task(self._map_transformer, data_context, ctx, *blocks, **kwargs)_map_task (remote function)
def _map_task(map_transformer: MapTransformer, data_context: DataContext, ctx: TaskContext,
*blocks: Block, **kwargs) -> Iterator[Union[Block, List[BlockMetadata]]]:
DataContext._set_current(data_context)
ctx.kwargs.update(kwargs)
TaskContext.set_current(ctx)
stats = BlockExecStats.builder()
map_transformer.set_target_max_block_size(ctx.target_max_block_size)
with MemoryProfiler(data_context.memory_usage_poll_interval_s) as profiler:
for b_out in map_transformer.apply_transform(iter(blocks), ctx):
m_out = BlockAccessor.for_block(b_out).get_metadata()
m_out.exec_stats = stats.build()
m_out.exec_stats.udf_time_s = map_transformer.udf_time()
m_out.exec_stats.task_idx = ctx.task_idx
m_out.exec_stats.max_uss_bytes = profiler.estimate_max_uss()
yield b_out
yield m_out
stats = BlockExecStats.builder()
profiler.reset()
TaskContext.reset_current()Dispatching Logic
def _dispatch_tasks(self):
while self._bundle_queue:
if self._actor_locality_enabled:
actor = self._actor_pool.pick_actor(self._bundle_queue.peek())
else:
actor = self._actor_pool.pick_actor()
if actor is None:
break
bundle = self._bundle_queue.pop()
self._metrics.on_input_dequeued(bundle)
input_blocks = [block for block, _ in bundle.blocks]
ctx = TaskContext(task_idx=self._next_data_task_idx,
target_max_block_size=self.actual_target_max_block_size)
gen = actor.submit.options(
num_returns="streaming",
name=f"{self.name}.submit",
**self._ray_actor_task_remote_args,
).remote(self.data_context, ctx, *input_blocks, **self.get_map_task_kwargs())
def _task_done_callback(actor_to_return):
self._actor_pool.return_actor(actor_to_return)
self._dispatch_tasks()
self._submit_data_task(gen, bundle, partial(_task_done_callback, actor_to_return=actor))The method repeatedly picks a ready actor, pulls a bundle from the queue, and submits the bundle to the actor via a streaming remote call. The returned ObjectRefGenerator is wrapped in a DataOpTask for tracking.
Streaming Execution Engine
OpState
class OpState:
def __init__(self, op: PhysicalOperator, inqueues: List[OpBufferQueue]):
assert len(inqueues) == len(op.input_dependencies)
self.inqueues = inqueues
self.outqueue = OpBufferQueue()
self.op = op
self.num_completed_tasks = 0
self._finished = False
self._exception = None
def add_output(self, ref: RefBundle) -> None:
self.outqueue.append(ref)
self.num_completed_tasks += 1
if self.progress_bar:
assert ref.num_rows() is not None
self.progress_bar.update(ref.num_rows(), self.op.num_output_rows_total())
def dispatch_next_task(self) -> None:
for i, inqueue in enumerate(self.inqueues):
ref = inqueue.pop()
if ref is not None:
self.op.add_input(ref, input_index=i)
return
assert False, "Nothing to dispatch"
def get_output_blocking(self, output_split_idx: Optional[int] = None) -> RefBundle:
while True:
if self._exception is not None:
raise self._exception
elif self._finished and not self.outqueue.has_next(output_split_idx):
raise StopIteration()
ref = self.outqueue.pop(output_split_idx)
if ref is not None:
return ref
time.sleep(0.01)StreamingExecutor
class StreamingExecutor(Executor, threading.Thread):
def execute(self, dag: PhysicalOperator, initial_stats: Optional[DatasetStats] = None) -> Iterator[RefBundle]:
self._topology, _ = build_streaming_topology(dag, self._options)
self._has_op_completed = {op: False for op in self._topology}
self._output_node = self._topology[dag]
self.start()
class StreamIterator(OutputIterator):
def __init__(self, outer: Executor):
self._outer = outer
def get_next(self, output_split_idx: Optional[int] = None) -> RefBundle:
try:
item = self._outer._output_node.get_output_blocking(output_split_idx)
if self._outer._global_info:
self._outer._global_info.update(item.num_rows(), dag.num_output_rows_total())
return item
except BaseException as e:
self._outer.shutdown(e if not isinstance(e, StopIteration) else None)
raise
def __del__(self):
self._outer.shutdown()
return StreamIterator(self)
def run(self):
try:
while True:
continue_sched = self._scheduling_loop_step(self._topology)
if not continue_sched or self._shutdown:
break
except Exception as e:
self._output_node.mark_finished(e)
finally:
self._output_node.mark_finished()
def _scheduling_loop_step(self, topology: Topology) -> bool:
self._resource_manager.update_usages()
num_errored_blocks = process_completed_tasks(
topology, self._resource_manager, self._max_errored_blocks)
op = select_operator_to_run(
topology, self._resource_manager, self._backpressure_policies,
self._autoscaler, ensure_at_least_one_running=self._consumer_idling())
while op is not None:
topology[op].dispatch_next_task()
self._resource_manager.update_usages()
op = select_operator_to_run(
topology, self._resource_manager, self._backpressure_policies,
self._autoscaler, ensure_at_least_one_running=self._consumer_idling())
for op, state in topology.items():
while op.has_next():
state.add_output(op.get_next())
return not all(op.completed() for op in topology)process_completed_tasks
def process_completed_tasks(topology: Topology, resource_manager: ResourceManager,
max_errored_blocks: int) -> int:
active_tasks: Dict[Waitable, Tuple[OpState, OpTask]] = {}
for op, state in topology.items():
for task in op.get_active_tasks():
active_tasks[task.get_waitable()] = (state, task)
num_errored_blocks = 0
if active_tasks:
ready, _ = ray.wait(list(active_tasks.keys()), num_returns=len(active_tasks),
fetch_local=False, timeout=0.1)
ready_tasks_by_op = defaultdict(list)
for ref in ready:
state, task = active_tasks[ref]
ready_tasks_by_op[state].append(task)
for state, ready_tasks in ready_tasks_by_op.items():
ready_tasks = sorted(ready_tasks, key=lambda t: t.task_index())
for task in ready_tasks:
if isinstance(task, DataOpTask):
try:
bytes_read = task.on_data_ready(max_bytes_to_read_per_op.get(state))
if state in max_bytes_to_read_per_op:
max_bytes_to_read_per_op[state] -= bytes_read
except Exception as e:
num_errored_blocks += 1
should_ignore = (max_errored_blocks < 0 or max_errored_blocks >= num_errored_blocks)
if should_ignore:
logger.error(f"Ignoring error from operator {state.op.name}", exc_info=e)
else:
logger.error("Dataset execution will now abort.")
raise
else:
assert isinstance(task, MetadataOpTask)
task.on_task_finished()
for op, op_state in topology.items():
while op.has_next():
op_state.add_output(op.get_next())
return num_errored_blocksThe scheduler repeatedly waits for any task (DataOpTask or MetadataOpTask) to become ready, pulls its output, moves it into the appropriate OpState queue, and then dispatches new work downstream.
Conclusion
Ray Data offers a clean high‑level API while its internal engine performs sophisticated planning, optimization, and streaming execution. The workflow can be summarized as:
Build a LogicalPlan from user‑level Dataset calls.
Convert the logical plan into a PhysicalPlan composed of operators such as Read, MapOperator, and Write.
Execute the physical plan with StreamingExecutor , which runs operators in parallel using Ray tasks or actors, moves data through RefBundle queues, and continuously schedules work until the DAG finishes.
This design enables Ray Data to process datasets that exceed memory limits, efficiently utilize heterogeneous resources (CPU/GPU), and provide a flexible foundation for custom data sources, sinks, and user‑defined transformations.
Alibaba Cloud Developer
Alibaba's official tech channel, featuring all of its technology innovations.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
