Big Data 13 min read

Understanding Spark Cache and Checkpoint Mechanisms

This article explains Spark's cache and checkpoint mechanisms, detailing when to use each, how they are implemented internally, how cached and checkpointed RDDs are stored and retrieved, and the differences between caching, persisting, and checkpointing for reliable big‑data processing.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Understanding Spark Cache and Checkpoint Mechanisms

Spark introduces a cache mechanism that allows frequently accessed data to be stored in memory, speeding up iterative and interactive applications compared to Hadoop MapReduce.

When a task fails, recomputing the entire computing chain can be costly; therefore, expensive RDDs can be checkpointed to enable recovery from a saved state.

Cache Mechanism

In the earlier GroupByTest example, caching the FlatMappedRDD lets subsequent jobs start directly from that RDD, demonstrating shared cached data across jobs within the same application.

Only RDDs that are reused and not too large should be cached. Users can cache an RDD by calling rdd.cache(), which creates a PersistRDD with a MEMORY_ONLY storage level.

Internally, when a partition is computed, Spark checks whether it should be cached; if so, the partition is stored in the BlockManager's memoryStore, falling back to diskStore when memory is insufficient.

Cached partitions are stored in the BlockManager's memoryStore (or diskStore for checkpoint). The BlockManager uses an LRU‑like policy to evict older cached partitions when space runs out.

When a later job needs a cached RDD, the task first queries the BlockManager; if the partition resides locally, it is read directly, otherwise the driver obtains the partition’s location and the task fetches it from a remote BlockManager.

Checkpoint

RDDs with long computation chains or large workloads should be checkpointed. Checkpointing writes the RDD to a reliable storage (e.g., HDFS) after a job finishes, requiring the RDD to be recomputed once more.

Typical workflow: the driver calls rdd.checkpoint(), marks the RDD, and after the job ends Spark runs a separate job that writes the RDD’s partitions to the checkpoint directory.

During execution, Spark checks whether a partition has been checkpointed; if so, it reads the data from the checkpoint file via a special CheckpointRDD parent.

Unlike caching, checkpointing discards the lineage, allowing long lineages to be truncated and providing fault‑tolerant persistence.

Persisting with rdd.persist(StorageLevel.DISK_ONLY) differs from checkpointing: persisted data lives in the BlockManager and is removed when the executor shuts down, whereas checkpointed data remains on HDFS and can be reused by subsequent driver programs.

Discussion

Compared with Hadoop MapReduce, which repeatedly persists intermediate data and must recompute from scratch after failures, Spark’s cache and checkpoint mechanisms reduce redundant work. Users can manually checkpoint critical RDDs to avoid costly recomputation, accepting the overhead of an extra job.

Example code demonstrates setting a checkpoint directory, checkpointing an RDD, and performing a groupByKey operation.

rdd.iterator()
=> SparkEnv.get.cacheManager.getOrCompute(thisRDD, split, context, storageLevel)
=> key = RDDBlockId(rdd.id, split.index)
=> blockManager.get(key)
=> computedValues = rdd.computeOrReadCheckpoint(split, context)
   if (isCheckpointed) firstParent[T].iterator(split, context)
   else compute(split, context)
=> elements = new ArrayBuffer[Any]
=> elements ++= computedValues
=> updatedBlocks = blockManager.put(key, elements, tellMaster = true)
val data1 = Array[(Int, Char)]((1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 'e'), (3, 'f'), (2, 'g'), (1, 'h'))
val pairs1 = sc.parallelize(data1, 3)

val data2 = Array[(Int, Char)]((1, 'A'), (2, 'B'), (3, 'C'), (4, 'D'))
val pairs2 = sc.parallelize(data2, 2)

pairs2.checkpoint

val result = pairs1.join(pairs2)
result.checkpoint
package internals

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object groupByKeyTest {

   def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("GroupByKey").setMaster("local")
    val sc = new SparkContext(conf) 
    sc.setCheckpointDir("/Users/xulijie/Documents/data/checkpoint")

    val data = Array[(Int, Char)]((1, 'a'), (2, 'b'),
                                     (3, 'c'), (4, 'd'),
                                     (5, 'e'), (3, 'f'),
                                     (2, 'g'), (1, 'h')
                                    )                               
    val pairs = sc.parallelize(data, 3)

    pairs.checkpoint
    pairs.count

    val result = pairs.groupByKey(2)

    result.foreachWith(i => i)((x, i) => println("[PartitionIndex " + i + "] " + x))

    println(result.toDebugString)
   }
}
Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

performanceCacheSparkCheckpointRDD
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.