How Ray Data Accelerates AI Workloads with Streaming Execution

Ray Data is a scalable library built on Ray that offers high‑performance, streaming‑execution APIs for AI workloads, enabling efficient batch inference, data preprocessing, and training data ingestion across CPU and GPU resources, while supporting diverse data formats and seamless integration with popular frameworks.

Big Data Technology Tribe
Big Data Technology Tribe
Big Data Technology Tribe
How Ray Data Accelerates AI Workloads with Streaming Execution

Core Concepts in Ray Data

Datasets and Blocks

A Dataset is the primary Python API representing a distributed collection of rows. Users create a Dataset from external storage or in‑memory objects, apply lazy transformations, and then either write the results out or feed them to training jobs. Transformations are executed only when an action such as show() or materialize() is called, allowing Ray Data to optimise the execution plan.

A Block is a columnar partition of a Dataset (e.g., an Apache Arrow table). Each Dataset is split into multiple blocks, and processing occurs at the block level, enabling parallel execution across the cluster.

The figure below visualises a dataset composed of three blocks, each containing 1,000 rows. Internally, blocks are stored in Ray’s shared object store and can be accessed as Pandas DataFrames or PyArrow Tables.

Streaming Execution Model

Ray Data processes large datasets with a streaming execution model. Instead of loading the entire dataset into memory, it builds a pipeline of operations that stream data through the system, which is essential for inference and training workloads that cannot fit wholly in memory.

Example:

import ray
# Create a dataset with 1K rows
ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
# Define a pipeline of operations
ds = ds.map(lambda x: {"target1": x["target"] * 2})
ds = ds.map(lambda x: {"target2": x["target1"] * 2})
ds = ds.map(lambda x: {"target3": x["target2"] * 2})
# Filter rows where target3 is divisible by 4
ds = ds.filter(lambda x: x["target3"] % 4 == 0)
# Trigger execution
ds.show(5)

The logical plan generated by this pipeline looks like:

Filter(lambda)
+- Map(lambda)
   +- Map(lambda)
      +- Map(lambda)
         +- Dataset(schema={...})

Operators are connected in a pipeline where each operator’s output queue feeds directly into the next operator’s input queue, enabling high‑throughput, concurrent execution of CPU‑bound and GPU‑bound stages.

Note: Operators such as ds.sort() and ds.groupby() materialise data, which may increase memory usage for very large datasets.

Quick Start

Loading Data

Ray Data can create datasets from local files, Python objects, or cloud storage services (S3, GCS, etc.) that support the Arrow filesystem interface.

import ray
# Load a CSV dataset directly from S3
ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
# Preview the first record
ds.show(limit=1)

Sample output:

{'sepal length (cm)': 5.1, 'sepal width (cm)': 3.5, 'petal length (cm)': 1.4, 'petal width (cm)': 0.2, 'target': 0}

Transforming Data

Custom functions (UDFs) can be applied to datasets via map_batches. Ray parallelises these transformations across the cluster.

from typing import Dict
import numpy as np

def transform_batch(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    vec_a = batch["petal length (cm)"]
    vec_b = batch["petal width (cm)"]
    batch["petal area (cm^2)"] = vec_a * vec_b
    return batch

transformed_ds = ds.map_batches(transform_batch)
# Execute lazy transformations and materialise the dataset
print(transformed_ds.materialize())

The materialised dataset now includes a new column petal area (cm^2) in its schema.

Consuming Data

Ray Data provides APIs such as take_batch() and iter_batches() for extracting data, or a Dataset can be passed directly to Ray Tasks or Actors.

# Extract the first 3 rows as a batch
print(transformed_ds.take_batch(batch_size=3))

Sample output shows NumPy arrays for each column, including the newly computed petal area (cm^2).

Saving Data

Processed datasets can be written to various formats (Parquet, CSV, etc.) and stored locally or in cloud storage.

import os
# Write the dataset as Parquet files
transformed_ds.write_parquet("/tmp/iris")
# Verify the files were created
print(os.listdir("/tmp/iris"))

The directory contains shard files such as ..._000000.parquet and ..._000001.parquet.

PythonRay Datastreaming executionAI data processing
Big Data Technology Tribe
Written by

Big Data Technology Tribe

Focused on computer science and cutting‑edge tech, we distill complex knowledge into clear, actionable insights. We track tech evolution, share industry trends and deep analysis, helping you keep learning, boost your technical edge, and ride the digital wave forward.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.