How to Build a Scalable Multimodal Data Pipeline with Alibaba Cloud PAI and DataJuicer

This article details a step‑by‑step guide for constructing a high‑performance multimodal data pipeline—covering video segmentation, duration filtering, frame extraction, safety and aesthetic scoring, and caption generation—using Alibaba Cloud PAI, Paimon, DataJuicer, and distributed frameworks like Ray and Daft, with real‑world performance metrics.

Alibaba Cloud Big Data AI Platform
Alibaba Cloud Big Data AI Platform
Alibaba Cloud Big Data AI Platform
How to Build a Scalable Multimodal Data Pipeline with Alibaba Cloud PAI and DataJuicer

Introduction

As artificial intelligence moves from single‑modality to full‑scene multimodal fusion, systems must ingest and process terabytes to petabytes of heterogeneous data such as video, audio, sensor streams, and contextual metadata. Traditional architectures struggle with storage bottlenecks, low compute efficiency, and poor scalability, limiting model iteration speed.

Solution Architecture

The proposed solution combines Alibaba Cloud AI Platform PAI with the streaming data‑lake warehouse Paimon to create a high‑performance, elastic data‑processing chain. DataJuicer provides a rich library of operators that can be executed on either the Ray distributed compute framework or the newer Daft data‑processing engine.

Pipeline Overview

The pipeline processes typical multimodal scenarios such as intelligent video surveillance, multimedia content analysis, and human‑machine interaction. It consists of the following stages:

1. Scene Segmentation

Use ContentDetector to split videos into coherent scene clips when frame‑wise differences exceed a threshold.

2. Duration Filtering

Discard clips that are too short or too long to ensure downstream models receive well‑bounded inputs.

3. Video Frame Extraction

Extract representative frames (keyframes) from each clip, performing basic quality checks and storing results in the video_frames field.

4. Content Safety Filtering

Apply an NSFW detection model ( Falconsai/nsfw_image_detection) to compute an average safety score per clip; retain clips with scores between 0.0 and 0.5.

5. Dynamic Motion Scoring

Calculate motion intensity via optical‑flow analysis and keep clips whose motion score is above 0.25 (no upper bound).

6. Aesthetic Scoring

Use a pretrained aesthetic model (

shunk031/aesthetics-predictor‑v2‑sac‑logos‑ava1‑l14‑linearMSE

) and retain clips with average scores between 0.4 and 1.0.

7. Video Caption Generation

Feed the selected frames into a multimodal captioning model ( kpyu/video‑blip‑opt‑2.7b‑ego4d) to produce natural‑language descriptions for each clip.

Performance Evaluation

Small‑scale test : 6,865 videos (52.4 h total) processed on an L20 node (8 GPU + 128 CPU) in ~20 minutes. GPU utilization charts are shown in the original figures.

Large‑scale test : 2 M videos (30 k h total) processed on a 45‑node cluster (each node 8 GPU + 180 CPU) in ~200 minutes, with GPU utilization consistently high. Detailed utilization graphs are included.

Best Practices

Prepare the data lake and install required Python packages: pip install py-data-juicer pypaimon.

Configure OSS credentials and DLF catalog settings as shown in the code snippets.

Choose between Ray and Daft implementations based on existing infrastructure; both wrap DataJuicer operators.

Ray‑Based Implementation

# Install dependencies
pip install py-data-juicer pypaimon

# Example of reading from DLF and processing with Ray
import os, sys, copy, pyarrow as pa, ray
from data_juicer.ops.mapper import VideoSplitByKeyFrameMapper, VideoExtractFramesMapper, VideoCaptioningFromVideoMapper
# ... (full code omitted for brevity) ...

Daft‑Based Implementation

# Convert DataJuicer operators to Daft UDFs
from daft import udf, DataType

def dj_operator_to_daft(dj_op_instance, input_columns, output_columns=None, resource_config=None):
    # Wrapper implementation (see source) ...
    return WrappedBatchOp

# Example usage
split_video_op = VideoSplitByKeyFrameMapper(...)
VideoSplitUDF = dj_operator_to_daft(split_video_op, ["videos", "text"], {"clips": DataType.list(DataType.binary())}, {"num_cpus": 4})

Running the Full Pipeline with Daft

import daft, uuid, os, time
from daft import DataType

# Load video files
df = daft.from_glob_path("./data/Youku-AliceMind/caption/validation/videos").with_column_renamed("path", "videos")
# Apply operators sequentially
df = df.with_column("clips", VideoSplitUDF(df["videos"], df["text"]))
df = df.explode("clips")
# Duration filter, keyframe extraction, safety, aesthetics, captioning ...
# Write results
df.write_parquet("./outputs/daft/out_parquet", write_mode="overwrite")
print(f"Total cost time: {time.time() - s_time}")

Product Overview (Brief)

Alibaba Cloud AI Platform PAI offers elastic high‑performance compute clusters, advanced resource scheduling, deep integration with open‑source ecosystems, and a fully managed experience, making it suitable for large‑scale multimodal data processing. The Paimon lake‑warehouse provides a unified streaming‑batch storage format with high‑throughput writes and low‑latency queries, compatible with Flink, Spark, Hive, and Trino.

Architecture diagram
Architecture diagram
data pipelineAImultimodalAlibaba CloudRayDaftDataJuicer
Alibaba Cloud Big Data AI Platform
Written by

Alibaba Cloud Big Data AI Platform

The Alibaba Cloud Big Data AI Platform builds on Alibaba’s leading cloud infrastructure, big‑data and AI engineering capabilities, scenario algorithms, and extensive industry experience to offer enterprises and developers a one‑stop, cloud‑native big‑data and AI capability suite. It boosts AI development efficiency, enables large‑scale AI deployment across industries, and drives business value.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.