Big Data 24 min read

Migrating Spark Offline Computing to Kubernetes: Architecture, Optimizations, and Lessons Learned

Youzan migrated its large‑scale offline Spark workloads from YARN to a cloud‑native Kubernetes architecture, separating storage and compute with Ceph FS, adding dynamic executor allocation and remote shuffle services, and applying numerous Spark and deployment tweaks that yielded elastic scaling, higher resource utilization, reduced costs, and valuable operational lessons.

Youzan Coder
Youzan Coder
Youzan Coder
Migrating Spark Offline Computing to Kubernetes: Architecture, Optimizations, and Lessons Learned

With rapid business growth, the cost of big data processing has risen. This article describes how Youzan's offline computing platform was transformed to a cloud‑native architecture in 2020, achieving lower cost and higher efficiency by containerizing Spark workloads, enabling elastic scaling, and decoupling storage and compute.

Current status of the offline platform includes 10‑Gbps network clusters, >90% of tasks running on Spark, and mixed compute‑storage resources causing a “bucket effect”. The new direction focuses on storage‑compute separation, improving machine utilization via Kubernetes, and finer‑grained resource isolation using Docker.

The technical solution addresses two main challenges when migrating from YARN to Kubernetes: missing executor dynamic allocation and handling shuffle/sort data after storage‑compute separation. Solutions include using SPARK‑27963 for dynamic allocation without external shuffle service, enabling shuffle service as a DaemonSet, or adopting remote storage for shuffle data (e.g., Uber’s RemoteShuffle).

For storage, the team evaluated hostPath, Ceph RBD, and Ceph FS. HostPath binds Pods to specific nodes and suffers I/O limits. Ceph provides high read/write performance, striping, and multiple storage modes (block, file, object). Ceph FS is recommended for Spark Pods because it supports ReadWriteMany, allowing shuffle data to be accessed like local files.

Spark migration to Kubernetes required several modifications:

Adding small‑file merging to reduce NameNode pressure.

Integrating Filebeat to ship executor logs to Kafka for reliable log collection.

Adopting a remote shuffle service to detach shuffle data from executor Pods.

Reordering Driver Pod creation to ensure ConfigMaps and Volumes exist before the Driver starts.

Distributing local resources via HDFS instead of embedding them in Docker images.

Exposing Spark Web UI through Ingress.

Extending Spark Pod labels to separate driver and executor scheduling.

Improving Spark app status handling so that spark‑submit returns the driver Pod exit code.

Allowing dynamic adjustment of dynamicAllocation.maxExecutors without restarting the service.

Deployment optimizations include peak‑shaving mixed‑tenant scheduling and elastic scaling of the Kubernetes cluster based on workload patterns, which significantly improves resource utilization and reduces cloud costs.

Common pitfalls and lessons learned:

Kubernetes may mistakenly kill executors due to containerd‑shim bugs; upgrade containerd.

Linux kernel parameters (somaxconn, net.core.somaxconn) affect shuffle connection stability; tune them.

Executor loss handling needed to remove rather than disable lost executors.

Memory allocation loops in Spark can cause long waits; add timeout mechanisms.

Shuffle data corruption can be mitigated by applying SPARK‑30225 and SPARK‑26089.

Classpath conflicts when using --files with fat JARs require adding the user‑dir to the executor classpath.

Resource quota exhaustion requires retry logic for pod creation.

In conclusion, migrating Youzan’s offline Spark workloads from YARN to Kubernetes enabled cloud‑native capabilities such as storage‑compute separation, containerization, and mixed‑tenant scheduling, delivering hour‑level elasticity and supporting business peaks more effectively.

while (true) {
  val numActiveTasks = memoryForTask.keys.size
  val curMem = memoryForTask(taskAttemptId)
  maybeGrowPool(numBytes - memoryFree)
  val maxPoolSize = computeMaxPoolSize()
  val maxMemoryPerTask = maxPoolSize / numActiveTasks
  val minMemoryPerTask = poolSize / (2 * numActiveTasks)
  val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))
  val toGrant = math.min(maxToGrant, memoryFree)
  if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
    logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
    lock.wait()
    //cause dead lock
  } else {
    memoryForTask(taskAttemptId) += toGrant
    return toGrant
  }
}
cloud-nativePerformance optimizationKubernetesDevOpsSpark
Youzan Coder
Written by

Youzan Coder

Official Youzan tech channel, delivering technical insights and occasional daily updates from the Youzan tech team.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.