Big Data 20 min read

Vivo’s 800‑Day Journey Optimizing Celeborn Remote Shuffle Service at PB Scale

This technical report details how Vivo’s big‑data platform adopted Celeborn as its remote shuffle service, evaluated alternatives, tuned hardware and software configurations, implemented performance and stability enhancements, and outlines future operational and community‑driven improvements for handling petabyte‑scale shuffle workloads.

vivo Internet Technology
vivo Internet Technology
vivo Internet Technology
Vivo’s 800‑Day Journey Optimizing Celeborn Remote Shuffle Service at PB Scale

Background

Vivo’s data platform faced rapid growth in data volume and job count, exhausting compute resources. Online services peak during the day while offline batch jobs peak at night, leaving many servers under‑utilized. To improve elasticity, Vivo partnered with its container team in 2023 to run massive offline Spark jobs on shared Kubernetes clusters, evaluating YARN‑on‑K8s and Spark‑on‑K8s solutions.

RSS Initial Experience

After benchmarking three mainstream remote shuffle services—Celeborn, Uber RemoteShuffleService, and Firestorm—Vivo selected Celeborn for three reasons:

Ecosystem compatibility : Native support for Spark 3.x dynamic allocation, unlike Uber RSS (limited to Spark 2.4/3.0) and Firestorm (no Spark 3.2 support).

Observability & operations friendliness : Integrated Prometheus metrics enable real‑time bottleneck detection; the other services lack monitoring.

Performance & stability : Off‑heap memory reduces JVM GC pauses; slot‑based concurrency control prevents worker overload.

Celeborn’s lack of DFS support was acceptable as a temporary limitation.

Hardware Adaptation and Testing

Three server models were benchmarked. Model B gave the best overall performance; Model C’s high‑speed SSDs suffered under RAID‑5 and limited capacity, causing data congestion. From these tests Vivo derived four hardware selection principles:

Prioritize the number of disks over individual SSD speed.

Avoid excessive RAID; temporary shuffle data can tolerate minor loss.

Allocate roughly 250 GB of memory for petabyte‑scale shuffle workloads.

Reuse older, long‑served machines because Celeborn tolerates modest hardware.

Service Robustness

Master High‑Availability

Celeborn uses Raft for master HA. Rotating restarts of master nodes never impacted running tasks, and the shuffle service remained uninterrupted. In a three‑master setup, the cluster tolerated two simultaneous master failures as long as one remained alive.

Hot‑Update Validation

In Kubernetes, worker IPs were fixed to avoid master dirty‑data issues after frequent restarts. Dual‑replica deployments were confirmed mandatory for production to ensure continuity despite longer data‑transfer windows.

Performance Testing

On a 3‑master + 5‑worker cluster, Celeborn consistently outperformed the baseline ESS:

At 5.9 TB, Celeborn took only 47 % of ESS’s time.

At 28.3 TB, Celeborn remained ~20 % faster.

Beyond 79.3 TB, ESS failed completely while Celeborn completed all cases.

A data‑loss issue (CELEBORN‑383) was reported and quickly fixed by the community.

800‑Day Experience – Optimizations

1. Asynchronous OpenStream Requests

Problem: Shuffle read latency spiked from milliseconds to seconds when many workers performed large file sorting; OpenStream calls blocked worker threads.

Root cause: OpenStream occupied threads until sorting finished, creating a thread‑starvation loop.

Solution: Modified the client to issue asynchronous OpenStream requests, reducing thread blockage. The change is available in pull request https://github.com/apache/celeborn/pull/2133.

2. Small‑File Cache

Problem: Under >70 % cluster load, KB‑level file reads sometimes took minutes.

Root cause: OS FIFO scheduling caused small‑file I/O to wait behind large, random requests.

Implementation: Added a FileMemCacheManager that caches eligible shuffle files during the commit phase, allowing reads to be served from memory.

class FetchChunkTask(client: TransportClient, request: ChunkFetchRequest, fetchBeginTime: Long) {
    private val cli = client
    private val req = request
    private val beginTime = fetchBeginTime
    def getClient: TransportClient = cli
    def getRequest: ChunkFetchRequest = req
    def getBeginTime: Long = beginTime
}

Result: Maximum shuffle‑read latency dropped from 4 minutes to under 2 minutes; average latency fell from >200 ms to <60 ms.

3. Disk‑Level Thread Control

Problem: In multi‑disk workers (12‑24 disks), a few disks saturated at 100 % I/O while others stayed idle, limiting throughput.

Analysis: A global thread pool let a hot disk monopolize threads, causing excessive context switches and retries.

Solution: Introduced per‑disk DiskReader objects with a thread‑limit and a per‑disk FetchTask queue. Workers now balance load across disks, reducing the impact of a single failing disk by ~80 %.

Disk I/O utilization became balanced.

Node‑level throughput degradation due to a bad disk was limited.

4. Dynamic Slot Allocation (LoadAware)

Problem: The default RoundRobin slot allocation could overload disks; the LoadAware strategy required manual tuning of celeborn.slots.assign.loadAware.discardDisk.maxWeight and thresholds.

Improvement: Implemented an automatic discarding mechanism that calculates the maximum tolerable high‑load disks (e.g., 0.3 × total disks) and excludes them from slot assignment when read/write latency exceeds 200 ms.

5. Client Authentication & Traffic Shaping

Problem: Open master URLs allowed unrestricted client access; large shuffle jobs could destabilize the cluster.

Solution: Clients now specify a cluster identifier; the Vivo config center resolves the actual master URL, enabling hot‑migration of masters during failures.

6. Congestion Control

Implemented user‑level and application‑level push‑traffic monitoring based on sliding‑window statistics. Applications exceeding 200 MB/s or a multiple of the average traffic trigger back‑pressure strategies (e.g., pause‑then‑resume).

Future Planning & Outlook

Celebrating stable operation of a petabyte‑scale shuffle service serving >130 k applications, Celeborn now achieves <5 ms disk‑flush latency (average 1.5 ms) and <500 ms read latency (average 50 ms). Upcoming work focuses on:

Operations platformization : Integrate Celeborn management into Ambari for automated scaling, configuration rollout, and reduced manual risk.

Community alignment : Upgrade from Celeborn 0.3.0 to newer releases to obtain columnar shuffle, vectorized acceleration, and reduce technical debt.

Cloud‑native & intelligent extensions : Deepen Kubernetes integration and explore AI‑driven resource scheduling.

big dataKubernetesSparkRemote Shuffle Serviceceleborn
vivo Internet Technology
Written by

vivo Internet Technology

Sharing practical vivo Internet technology insights and salon events, plus the latest industry news and hot conferences.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.