How Ray Data Accelerates AI Workloads with Streaming Execution
Ray Data is a scalable library built on Ray that offers high‑performance, streaming‑execution APIs for AI workloads, enabling efficient batch inference, data preprocessing, and training data ingestion across CPU and GPU resources, while supporting diverse data formats and seamless integration with popular frameworks.
Core Concepts in Ray Data
Datasets and Blocks
A Dataset is the primary Python API representing a distributed collection of rows. Users create a Dataset from external storage or in‑memory objects, apply lazy transformations, and then either write the results out or feed them to training jobs. Transformations are executed only when an action such as show() or materialize() is called, allowing Ray Data to optimise the execution plan.
A Block is a columnar partition of a Dataset (e.g., an Apache Arrow table). Each Dataset is split into multiple blocks, and processing occurs at the block level, enabling parallel execution across the cluster.
The figure below visualises a dataset composed of three blocks, each containing 1,000 rows. Internally, blocks are stored in Ray’s shared object store and can be accessed as Pandas DataFrames or PyArrow Tables.
Streaming Execution Model
Ray Data processes large datasets with a streaming execution model. Instead of loading the entire dataset into memory, it builds a pipeline of operations that stream data through the system, which is essential for inference and training workloads that cannot fit wholly in memory.
Example:
import ray
# Create a dataset with 1K rows
ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
# Define a pipeline of operations
ds = ds.map(lambda x: {"target1": x["target"] * 2})
ds = ds.map(lambda x: {"target2": x["target1"] * 2})
ds = ds.map(lambda x: {"target3": x["target2"] * 2})
# Filter rows where target3 is divisible by 4
ds = ds.filter(lambda x: x["target3"] % 4 == 0)
# Trigger execution
ds.show(5)The logical plan generated by this pipeline looks like:
Filter(lambda)
+- Map(lambda)
+- Map(lambda)
+- Map(lambda)
+- Dataset(schema={...})Operators are connected in a pipeline where each operator’s output queue feeds directly into the next operator’s input queue, enabling high‑throughput, concurrent execution of CPU‑bound and GPU‑bound stages.
Note: Operators such as ds.sort() and ds.groupby() materialise data, which may increase memory usage for very large datasets.
Quick Start
Loading Data
Ray Data can create datasets from local files, Python objects, or cloud storage services (S3, GCS, etc.) that support the Arrow filesystem interface.
import ray
# Load a CSV dataset directly from S3
ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
# Preview the first record
ds.show(limit=1)Sample output:
{'sepal length (cm)': 5.1, 'sepal width (cm)': 3.5, 'petal length (cm)': 1.4, 'petal width (cm)': 0.2, 'target': 0}Transforming Data
Custom functions (UDFs) can be applied to datasets via map_batches. Ray parallelises these transformations across the cluster.
from typing import Dict
import numpy as np
def transform_batch(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
vec_a = batch["petal length (cm)"]
vec_b = batch["petal width (cm)"]
batch["petal area (cm^2)"] = vec_a * vec_b
return batch
transformed_ds = ds.map_batches(transform_batch)
# Execute lazy transformations and materialise the dataset
print(transformed_ds.materialize())The materialised dataset now includes a new column petal area (cm^2) in its schema.
Consuming Data
Ray Data provides APIs such as take_batch() and iter_batches() for extracting data, or a Dataset can be passed directly to Ray Tasks or Actors.
# Extract the first 3 rows as a batch
print(transformed_ds.take_batch(batch_size=3))Sample output shows NumPy arrays for each column, including the newly computed petal area (cm^2).
Saving Data
Processed datasets can be written to various formats (Parquet, CSV, etc.) and stored locally or in cloud storage.
import os
# Write the dataset as Parquet files
transformed_ds.write_parquet("/tmp/iris")
# Verify the files were created
print(os.listdir("/tmp/iris"))The directory contains shard files such as ..._000000.parquet and ..._000001.parquet.
Big Data Technology Tribe
Focused on computer science and cutting‑edge tech, we distill complex knowledge into clear, actionable insights. We track tech evolution, share industry trends and deep analysis, helping you keep learning, boost your technical edge, and ride the digital wave forward.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
