Big Data 69 min read

How Ray Data Streams Data: From Logical Plans to Distributed Execution

This deep‑dive explains how Ray Data transforms user‑level Dataset APIs into a logical plan, optimizes it, converts it into a physical streaming execution graph, and runs it on a cluster using task and actor pools, detailing each component from read sources to write sinks with code examples.

Alibaba Cloud Developer
Alibaba Cloud Developer
Alibaba Cloud Developer
How Ray Data Streams Data: From Logical Plans to Distributed Execution

Introduction

Ray Data is a distributed data‑processing library built on top of Ray. It provides high‑performance, scalable APIs that can be combined with AI workloads such as batch inference and data preprocessing. The library uses a streaming execution model to process datasets that are larger than the memory of a single machine, allowing CPU and GPU resources to be used concurrently.

Example Pipeline

import ray
from typing import Dict
from numpy.typing import NDArray

# Load a CSV dataset directly from NAS/S3
ds = ray.data.read_csv("/mnt/example-data/iris.csv")

# User‑defined map class for per‑row inference
class CustomMapActor:
    def __init__(self):
        print("init")
    def __call__(self, single):
        single["output"] = "test"
        return single

# User‑defined map‑batch class for batch inference
class CustomMapBatchActor:
    def __init__(self):
        print("init")
    def __call__(self, data: Dict[str, NDArray]):
        return data

ds = ds.map(CustomMapActor, concurrency=5)
ds = ds.map_batches(CustomMapBatchActor, concurrency=5, batch_size=1024)
ds.write_csv("/mnt/result-data/")

The code demonstrates a classic Ray Data workflow: read a CSV, apply a per‑row map, apply a batch map, and finally write the result to CSV. Thanks to streaming execution, the pipeline can handle inputs far larger than the available memory.

Core Concepts

Datasets and Blocks

A Dataset represents a distributed collection of data. Internally it is composed of blocks , each holding a contiguous subset of rows (default 1000 rows). Blocks are stored as pyarrow.Table objects; if a row schema is too complex for Arrow, Ray Data falls back to pandas DataFrames.

Blocks reside in Ray’s distributed object store (Object Store) and are referenced by ObjectRef s.

Operators and Plans

Ray Data builds a directed acyclic graph (DAG) of LogicalOperators (user‑visible operations such as read_csv, map, write_csv). The Planner converts this logical DAG into a PhysicalPlan composed of PhysicalOperators . The conversion involves three steps:

Logical optimization (e.g., merging consecutive map operators).

Planning: mapping each logical operator to a concrete physical operator.

Physical optimization (e.g., fusing operators).

PhysicalOperator API

A PhysicalOperator transforms one or more input streams of RefBundle s into a single output stream. It provides the following core methods: add_input(refs, input_index) – receives upstream data. has_next() – indicates whether output is ready. get_next() – returns the next RefBundle.

Subclasses implement the internal logic via _add_input_inner and _get_next_inner. This design enables both bulk and streaming execution.

Logical → Physical Conversion

plan_read_op

def plan_read_op(op: Read, physical_children: List[PhysicalOperator], data_context: DataContext) -> PhysicalOperator:
    assert len(physical_children) == 0
    def get_input_data(target_max_block_size) -> List[RefBundle]:
        parallelism = op.get_detected_parallelism()
        read_tasks = op._datasource_or_legacy_reader.get_read_tasks(parallelism)
        ret = []
        for read_task in read_tasks:
            read_task_ref = ray.put(read_task)
            ref_bundle = RefBundle([(read_task_ref, cleaned_metadata(read_task, read_task_ref))], owns_blocks=False)
            ret.append(ref_bundle)
        return ret
    inputs = InputDataBuffer(data_context, input_data_factory=get_input_data)
    def do_read(blocks: Iterable[ReadTask], _: TaskContext) -> Iterable[Block]:
        for read_task in blocks:
            yield from read_task()
    transform_fns = [BlockMapTransformFn(do_read), BuildOutputBlocksMapTransformFn.for_blocks()]
    map_transformer = MapTransformer(transform_fns)
    return MapOperator.create(
        map_transformer,
        inputs,
        data_context,
        name=op.name,
        target_max_block_size=None,
        compute_strategy=TaskPoolStrategy(op._concurrency),
        ray_remote_args=op._ray_remote_args,
    )

The function creates an InputDataBuffer that yields ReadTask references, wraps the actual read logic in a BlockMapTransformFn, and finally builds a MapOperator (either a task‑pool or actor‑pool implementation depending on the compute strategy).

InputDataBuffer

class InputDataBuffer(PhysicalOperator):
    def __init__(self, data_context: DataContext, input_data: Optional[List[RefBundle]] = None,
                 input_data_factory: Optional[Callable[[int], List[RefBundle]]] = None):
        super().__init__("Input", [], data_context, target_max_block_size=None)
        if input_data is not None:
            self._input_data = input_data[:]
            self._is_input_initialized = True
            self._initialize_metadata()
        else:
            assert input_data_factory is not None
            self._input_data_factory = input_data_factory
            self._is_input_initialized = False
        self._input_data_index = 0

    def start(self, options: ExecutionOptions) -> None:
        if not self._is_input_initialized:
            self._input_data = self._input_data_factory(self.actual_target_max_block_size)
            self._is_input_initialized = True
            self._initialize_metadata()
        for bundle in self._input_data:
            self._metrics.on_input_received(bundle)
        super().start(options)

    def has_next(self) -> bool:
        return self._input_data_index < len(self._input_data)

    def _get_next_inner(self) -> RefBundle:
        bundle = self._input_data[self._input_data_index]
        self._input_data_index += 1
        return bundle
InputDataBuffer

is always the first operator in a DAG; it never accepts upstream inputs.

MapOperator.create

class MapOperator(OneToOneOperator, ABC):
    @classmethod
    def create(cls, map_transformer: MapTransformer, input_op: PhysicalOperator,
               data_context: DataContext, name: str = "Map",
               compute_strategy: Optional[ComputeStrategy] = None, **kwargs) -> "MapOperator":
        if compute_strategy is None:
            compute_strategy = TaskPoolStrategy()
        if isinstance(compute_strategy, TaskPoolStrategy):
            from ray.data._internal.execution.operators.task_pool_map_operator import TaskPoolMapOperator
            return TaskPoolMapOperator(
                map_transformer,
                input_op,
                data_context,
                name=name,
                **kwargs,
            )
        elif isinstance(compute_strategy, ActorPoolStrategy):
            from ray.data._internal.execution.operators.actor_pool_map_operator import ActorPoolMapOperator
            return ActorPoolMapOperator(
                map_transformer,
                input_op,
                data_context,
                name=name,
                **kwargs,
            )
        else:
            raise ValueError(f"Unsupported execution strategy {compute_strategy}")

The factory selects either a TaskPoolMapOperator (Ray tasks) or an ActorPoolMapOperator (Ray actors) based on the provided compute strategy.

ActorPoolMapOperator

class ActorPoolMapOperator(MapOperator):
    def __init__(self, map_transformer: MapTransformer, input_op: PhysicalOperator,
                 data_context: DataContext, compute_strategy: ActorPoolStrategy,
                 name: str = "ActorPoolMap", **kwargs):
        super().__init__(map_transformer, input_op, data_context, name=name, **kwargs)
        self._actor_pool = _ActorPool(
            compute_strategy,
            self._start_actor,
            per_actor_resource_usage,
            self.data_context._enable_actor_pool_on_exit_hook,
        )
        self._bundle_queue = create_bundle_queue()
        self._cls = None
        self._inputs_done = False

    def _start_actor(self):
        assert self._cls is not None
        ctx = self.data_context
        if self._ray_remote_args_fn:
            self._refresh_actor_cls()
        actor = self._cls.options(_labels={self._OPERATOR_ID_LABEL_KEY: self.id}).remote(
            ctx, src_fn_name=self.name, map_transformer=self._map_transformer)
        res_ref = actor.get_location.options(name=f"{self.name}.get_location").remote()
        def _task_done_callback(res_ref):
            has_actor = self._actor_pool.pending_to_running(res_ref)
            if not has_actor:
                return
            self._dispatch_tasks()
        self._submit_metadata_task(res_ref, lambda: _task_done_callback(res_ref))
        return actor, res_ref

    def start(self, options: ExecutionOptions):
        self._actor_locality_enabled = options.actor_locality_enabled
        super().start(options)
        self._cls = ray.remote(**self._ray_remote_args)(_MapWorker)
        self._actor_pool.scale_up(self._actor_pool.min_size())
        refs = self._actor_pool.get_pending_actor_refs()
        try:
            timeout = self.data_context.wait_for_min_actors_s
            ray.get(refs, timeout=timeout)
        except ray.exceptions.GetTimeoutError as e:
            raise ray.exceptions.GetTimeoutError(
                "Timed out while starting actors. This may mean that the cluster does not have enough resources for the requested actor pool.")

The operator maintains an actor pool, a queue of pending input bundles, and dispatches work to actors as resources become available.

_MapWorker (actor implementation)

class _MapWorker:
    def __init__(self, ctx: DataContext, src_fn_name: str, map_transformer: MapTransformer):
        DataContext._set_current(ctx)
        self.src_fn_name = src_fn_name
        self._map_transformer = map_transformer
        self._map_transformer.init()

    def get_location(self) -> str:
        return ray.get_runtime_context().get_node_id()

    def submit(self, data_context: DataContext, ctx: TaskContext, *blocks: Block, **kwargs) -> Iterator[Union[Block, List[BlockMetadata]]]:
        yield from _map_task(self._map_transformer, data_context, ctx, *blocks, **kwargs)

_map_task (remote function)

def _map_task(map_transformer: MapTransformer, data_context: DataContext, ctx: TaskContext,
              *blocks: Block, **kwargs) -> Iterator[Union[Block, List[BlockMetadata]]]:
    DataContext._set_current(data_context)
    ctx.kwargs.update(kwargs)
    TaskContext.set_current(ctx)
    stats = BlockExecStats.builder()
    map_transformer.set_target_max_block_size(ctx.target_max_block_size)
    with MemoryProfiler(data_context.memory_usage_poll_interval_s) as profiler:
        for b_out in map_transformer.apply_transform(iter(blocks), ctx):
            m_out = BlockAccessor.for_block(b_out).get_metadata()
            m_out.exec_stats = stats.build()
            m_out.exec_stats.udf_time_s = map_transformer.udf_time()
            m_out.exec_stats.task_idx = ctx.task_idx
            m_out.exec_stats.max_uss_bytes = profiler.estimate_max_uss()
            yield b_out
            yield m_out
            stats = BlockExecStats.builder()
            profiler.reset()
    TaskContext.reset_current()

Dispatching Logic

def _dispatch_tasks(self):
    while self._bundle_queue:
        if self._actor_locality_enabled:
            actor = self._actor_pool.pick_actor(self._bundle_queue.peek())
        else:
            actor = self._actor_pool.pick_actor()
        if actor is None:
            break
        bundle = self._bundle_queue.pop()
        self._metrics.on_input_dequeued(bundle)
        input_blocks = [block for block, _ in bundle.blocks]
        ctx = TaskContext(task_idx=self._next_data_task_idx,
                         target_max_block_size=self.actual_target_max_block_size)
        gen = actor.submit.options(
            num_returns="streaming",
            name=f"{self.name}.submit",
            **self._ray_actor_task_remote_args,
        ).remote(self.data_context, ctx, *input_blocks, **self.get_map_task_kwargs())
        def _task_done_callback(actor_to_return):
            self._actor_pool.return_actor(actor_to_return)
            self._dispatch_tasks()
        self._submit_data_task(gen, bundle, partial(_task_done_callback, actor_to_return=actor))

The method repeatedly picks a ready actor, pulls a bundle from the queue, and submits the bundle to the actor via a streaming remote call. The returned ObjectRefGenerator is wrapped in a DataOpTask for tracking.

Streaming Execution Engine

OpState

class OpState:
    def __init__(self, op: PhysicalOperator, inqueues: List[OpBufferQueue]):
        assert len(inqueues) == len(op.input_dependencies)
        self.inqueues = inqueues
        self.outqueue = OpBufferQueue()
        self.op = op
        self.num_completed_tasks = 0
        self._finished = False
        self._exception = None

    def add_output(self, ref: RefBundle) -> None:
        self.outqueue.append(ref)
        self.num_completed_tasks += 1
        if self.progress_bar:
            assert ref.num_rows() is not None
            self.progress_bar.update(ref.num_rows(), self.op.num_output_rows_total())

    def dispatch_next_task(self) -> None:
        for i, inqueue in enumerate(self.inqueues):
            ref = inqueue.pop()
            if ref is not None:
                self.op.add_input(ref, input_index=i)
                return
        assert False, "Nothing to dispatch"

    def get_output_blocking(self, output_split_idx: Optional[int] = None) -> RefBundle:
        while True:
            if self._exception is not None:
                raise self._exception
            elif self._finished and not self.outqueue.has_next(output_split_idx):
                raise StopIteration()
            ref = self.outqueue.pop(output_split_idx)
            if ref is not None:
                return ref
            time.sleep(0.01)

StreamingExecutor

class StreamingExecutor(Executor, threading.Thread):
    def execute(self, dag: PhysicalOperator, initial_stats: Optional[DatasetStats] = None) -> Iterator[RefBundle]:
        self._topology, _ = build_streaming_topology(dag, self._options)
        self._has_op_completed = {op: False for op in self._topology}
        self._output_node = self._topology[dag]
        self.start()
        class StreamIterator(OutputIterator):
            def __init__(self, outer: Executor):
                self._outer = outer
            def get_next(self, output_split_idx: Optional[int] = None) -> RefBundle:
                try:
                    item = self._outer._output_node.get_output_blocking(output_split_idx)
                    if self._outer._global_info:
                        self._outer._global_info.update(item.num_rows(), dag.num_output_rows_total())
                    return item
                except BaseException as e:
                    self._outer.shutdown(e if not isinstance(e, StopIteration) else None)
                    raise
            def __del__(self):
                self._outer.shutdown()
        return StreamIterator(self)

    def run(self):
        try:
            while True:
                continue_sched = self._scheduling_loop_step(self._topology)
                if not continue_sched or self._shutdown:
                    break
        except Exception as e:
            self._output_node.mark_finished(e)
        finally:
            self._output_node.mark_finished()

    def _scheduling_loop_step(self, topology: Topology) -> bool:
        self._resource_manager.update_usages()
        num_errored_blocks = process_completed_tasks(
            topology, self._resource_manager, self._max_errored_blocks)
        op = select_operator_to_run(
            topology, self._resource_manager, self._backpressure_policies,
            self._autoscaler, ensure_at_least_one_running=self._consumer_idling())
        while op is not None:
            topology[op].dispatch_next_task()
            self._resource_manager.update_usages()
            op = select_operator_to_run(
                topology, self._resource_manager, self._backpressure_policies,
                self._autoscaler, ensure_at_least_one_running=self._consumer_idling())
        for op, state in topology.items():
            while op.has_next():
                state.add_output(op.get_next())
        return not all(op.completed() for op in topology)

process_completed_tasks

def process_completed_tasks(topology: Topology, resource_manager: ResourceManager,
                            max_errored_blocks: int) -> int:
    active_tasks: Dict[Waitable, Tuple[OpState, OpTask]] = {}
    for op, state in topology.items():
        for task in op.get_active_tasks():
            active_tasks[task.get_waitable()] = (state, task)
    num_errored_blocks = 0
    if active_tasks:
        ready, _ = ray.wait(list(active_tasks.keys()), num_returns=len(active_tasks),
                           fetch_local=False, timeout=0.1)
        ready_tasks_by_op = defaultdict(list)
        for ref in ready:
            state, task = active_tasks[ref]
            ready_tasks_by_op[state].append(task)
        for state, ready_tasks in ready_tasks_by_op.items():
            ready_tasks = sorted(ready_tasks, key=lambda t: t.task_index())
            for task in ready_tasks:
                if isinstance(task, DataOpTask):
                    try:
                        bytes_read = task.on_data_ready(max_bytes_to_read_per_op.get(state))
                        if state in max_bytes_to_read_per_op:
                            max_bytes_to_read_per_op[state] -= bytes_read
                    except Exception as e:
                        num_errored_blocks += 1
                        should_ignore = (max_errored_blocks < 0 or max_errored_blocks >= num_errored_blocks)
                        if should_ignore:
                            logger.error(f"Ignoring error from operator {state.op.name}", exc_info=e)
                        else:
                            logger.error("Dataset execution will now abort.")
                            raise
                else:
                    assert isinstance(task, MetadataOpTask)
                    task.on_task_finished()
    for op, op_state in topology.items():
        while op.has_next():
            op_state.add_output(op.get_next())
    return num_errored_blocks

The scheduler repeatedly waits for any task (DataOpTask or MetadataOpTask) to become ready, pulls its output, moves it into the appropriate OpState queue, and then dispatches new work downstream.

Conclusion

Ray Data offers a clean high‑level API while its internal engine performs sophisticated planning, optimization, and streaming execution. The workflow can be summarized as:

Build a LogicalPlan from user‑level Dataset calls.

Convert the logical plan into a PhysicalPlan composed of operators such as Read, MapOperator, and Write.

Execute the physical plan with StreamingExecutor , which runs operators in parallel using Ray tasks or actors, moves data through RefBundle queues, and continuously schedules work until the DAG finishes.

This design enables Ray Data to process datasets that exceed memory limits, efficiently utilize heterogeneous resources (CPU/GPU), and provide a flexible foundation for custom data sources, sinks, and user‑defined transformations.

Pythondata processingDistributed ComputingRay Datastreaming execution
Alibaba Cloud Developer
Written by

Alibaba Cloud Developer

Alibaba's official tech channel, featuring all of its technology innovations.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.