How DeepSeek Smallpond Powers AI Data Processing with Ray and DuckDB

This article introduces DeepSeek Smallpond, a lightweight yet high‑performance AI data‑processing engine built on Ray and DuckDB, explains its dual Dataframe and LogicalPlan APIs, showcases integration with Volcano Engine's AI Data Lake LAS, and provides practical code examples for distributed processing, multimodal storage, and RAG pipelines.

Volcano Engine Developer Services
Volcano Engine Developer Services
Volcano Engine Developer Services
How DeepSeek Smallpond Powers AI Data Processing with Ray and DuckDB

DeepSeek Smallpond Introduction

Smallpond is an open‑source, lightweight data‑processing engine for AI released by DeepSeek. It is built on Ray and DuckDB and offers five main advantages: lightweight, high performance, scalability, no operational overhead, and per‑job resource scheduling.

APIs and Quick Start

Smallpond provides two APIs:

Dataframe API – a high‑level interface similar to Pandas or PySpark.

LogicalPlan API – a low‑level, flexible interface for complex data‑processing logic.

import smallpond
sp = smallpond.init()

df = sp.read_parquet("path/to/dataset/*.parquet")
df = df.repartition(10)
df = df.map("x + 1")
df.write_parquet("path/to/output")
from smallpond.logical.dataset import ParquetDataSet
from smallpond.logical.node import Context, DataSourceNode, DataSetPartitionNode, SqlEngineNode, LogicalPlan
from smallpond.execution.driver import Driver

def my_pipeline(input_paths: List[str], npartitions: int):
    ctx = Context()
    dataset = ParquetDataSet(input_paths)
    node = DataSourceNode(ctx, dataset)
    node = DataSetPartitionNode(ctx, (node,), npartitions=npartitions)
    node = SqlEngineNode(ctx, (node,), "SELECT * FROM {0}")
    return LogicalPlan(ctx, node)

if __name__ == "__main__":
    driver = Driver()
    driver.add_argument("-i", "--input_paths", nargs="+")
    driver.add_argument("-n", "--npartitions", type=int, default=10)
    plan = my_pipeline(**driver.get_arguments())
    driver.run(plan)

Smallpond supports two execution engines: Ray and a built‑in engine, selectable via the mode parameter.

Architecture Overview

The architecture resembles Spark’s batch‑SQL kernel and consists of four layers:

Storage Layer – stores source and intermediate data, mountable locally, supports 3FS and fsspec interfaces.

Engine Layer – Ray or Built‑in engine, selected at runtime.

Execution Layer – LogicalPlan, optimizer, and physical plan generation; tasks run on DuckDB or Arrow.

API Layer – high‑level Dataframe API and low‑level LogicalPlan API.

Key Features

Ray‑based distributed scheduling and execution, offering superior performance over the built‑in engine.

MPI support with NUMA binding for efficient collective communication.

Highly flexible low‑level API allowing custom Python scripts and non‑SQL operators.

Integration with 3FS, fuse, and fsspec for cloud storage access.

Volcano Engine AI Data Lake LAS Overview

LAS addresses the challenges of multimodal, unstructured data in AI scenarios, providing capabilities for data management, computation, storage, and AI‑scene support (pre‑training, fine‑tuning, knowledge base, AI search, agents, compliance).

Dataset Management – versioned datasets, distributed processing for large‑scale pre‑training, data insight and fine‑grained editing for SFT.

Unified Catalog – register data as catalog tables for built‑in analytics.

Rich Operators – 100+ operators for text, image, video, audio, and documents.

Workflow Support – run Python, Spark, Ray jobs within the platform.

Multi‑format/Data Source – supports Lance, Iceberg, Parquet, JSON, CSV, VikingDB, OpenSearch, etc.

Storage‑Compute Separation – Proton cache service accelerates TOS data access.

Integration of Smallpond and LAS

Smallpond can be combined with LAS’s Ray compute resources, offering a cloud solution with five advantages: simple environment preparation, resource isolation, authentication, unified management, and K8s scheduling.

sp = smallpond.init(ray_address="ray://192.xxxx:10001")

Smallpond also integrates with Proton/TOS‑FS for cloud storage, supporting both 3FS and fsspec protocols.

Lance as a Multimodal Data Lake

Lance is a columnar storage format designed for multimodal data (video, image, audio, tabular). It supports versioning, multi‑dimensional analysis, fast random access, vector search (IVF‑PQ, IVF‑HNSW), and an open ecosystem compatible with Python/Java, Arrow, and AI frameworks.

import lance
import arrow
from smallpond.logical.dataset import ArrowTableDataSet

# Load Lance dataset
lance_ds = lance.dataset("example.lance")
# Convert to Arrow Table
arrow_table = lance_ds.to_table()
# Create Smallpond dataset
smallpond_dataset = ArrowTableDataSet()

Practical Example: RAG Offline Ingestion

A pipeline reads data from object storage, uses Smallpond to chunk and embed texts, and writes vectors to VikingDB. The steps are:

Create LAS compute resources (CPU/GPU queues).

Implement processors for chunking, embedding (using BGE‑M3), and sinking to VikingDB.

Submit the script to the LAS platform, which runs the Python code directly.

import copy, logging
import pyarrow as pa
import smallpond
from FlagEmbedding import FlagModel
from llama_index.core import Document
from llama_index.core.node_parser import SentenceSplitter
from volcengine.viking_db import Data, VikingDBService

class ChunkProcessor:
    """Split text into chunks for retrieval."""
    def __init__(self, input_col_name, output_col_name="chunk", chunk_size=500, chunk_overlap=50):
        self.input_col_name = input_col_name
        self.output_col_name = output_col_name
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self._sentence_splitter = SentenceSplitter(chunk_size=self.chunk_size, chunk_overlap=self.chunk_overlap)
    def __call__(self, row: dict, **kwargs) -> list[dict]:
        text_list = [row[self.input_col_name]]
        documents = [Document(text=t) for t in text_list]
        nodes = self._sentence_splitter.get_nodes_from_documents(documents)
        result = []
        for node in nodes:
            if not node.text:
                continue
            new_row = copy.deepcopy(row)
            new_row[self.output_col_name] = node.text
            result.append(new_row)
        return result

class EmbeddingProcessor:
    """Compute embeddings using a BGE model."""
    def __init__(self, input_col_name, output_col_name="embedding"):
        self.input_col_name = input_col_name
        self.output_col_name = output_col_name
        self._model = FlagModel("BAAI/bge-m3", use_fp16=True)
    def __call__(self, table: pa.Table) -> pa.Table:
        df = table.to_pandas()
        contents = df[self.input_col_name].tolist()
        embeddings = self._model.encode(contents)
        df[self.output_col_name] = [e.tolist() for e in embeddings]
        return pa.Table.from_pandas(df, preserve_index=False)

class VikingdbSinkProcessor:
    """Write data to Volcano's VikingDB vector database."""
    def __init__(self, collection_name, **kwargs):
        self.collection_name = collection_name
        service = VikingDBService(host=vikingdb_endpoint, region=vikingdb_region, scheme="http")
        service.set_ak(vikingdb_ak)
        service.set_sk(vikingdb_sk)
        self.collection = service.get_collection(self.collection_name)
    def _upsert_data(self, df):
        datas = [Data(row.to_dict()) for _, row in df.iterrows()]
        for i in range(0, len(datas), 10):
            batch = datas[i:i+10]
            self.collection.upsert_data(batch, async_upsert=True)
    def __call__(self, table: pa.Table):
        self._upsert_data(table.to_pandas())
        return table

if __name__ == "__main__":
    sp = smallpond.init()
    sp.read_csv(paths, schema)\
      .flat_map(ChunkProcessor(input_col_name="index"))\
      .map_batches(EmbeddingProcessor(input_col_name="chunk"))\
      .map_batches(VikingdbSinkProcessor(collection_name=vikingdb_dataset))\
      .take_all()

Conclusion and Outlook

Smallpond is a promising, lightweight, high‑performance AI data‑processing framework that complements LAS’s multimodal data‑lake capabilities. Future work includes better Ray cluster integration, broader storage protocol support (S3, TOS), and richer multimodal format adapters such as Lance, LMDB, WebDataset, and Pickle.

data lakeDistributed ComputingRayDuckDBLanceAI data processingSmallpond
Volcano Engine Developer Services
Written by

Volcano Engine Developer Services

The Volcano Engine Developer Community, Volcano Engine's TOD community, connects the platform with developers, offering cutting-edge tech content and diverse events, nurturing a vibrant developer culture, and co-building an open-source ecosystem.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.