Understanding Spark Cache and Checkpoint Mechanisms
This article explains Spark's cache and checkpoint mechanisms, detailing when to use each, how they are implemented internally, how cached and checkpointed RDDs are stored and retrieved, and the differences between caching, persisting, and checkpointing for reliable big‑data processing.
Spark introduces a cache mechanism that allows frequently accessed data to be stored in memory, speeding up iterative and interactive applications compared to Hadoop MapReduce.
When a task fails, recomputing the entire computing chain can be costly; therefore, expensive RDDs can be checkpointed to enable recovery from a saved state.
Cache Mechanism
In the earlier GroupByTest example, caching the FlatMappedRDD lets subsequent jobs start directly from that RDD, demonstrating shared cached data across jobs within the same application.
Only RDDs that are reused and not too large should be cached. Users can cache an RDD by calling rdd.cache(), which creates a PersistRDD with a MEMORY_ONLY storage level.
Internally, when a partition is computed, Spark checks whether it should be cached; if so, the partition is stored in the BlockManager's memoryStore, falling back to diskStore when memory is insufficient.
Cached partitions are stored in the BlockManager's memoryStore (or diskStore for checkpoint). The BlockManager uses an LRU‑like policy to evict older cached partitions when space runs out.
When a later job needs a cached RDD, the task first queries the BlockManager; if the partition resides locally, it is read directly, otherwise the driver obtains the partition’s location and the task fetches it from a remote BlockManager.
Checkpoint
RDDs with long computation chains or large workloads should be checkpointed. Checkpointing writes the RDD to a reliable storage (e.g., HDFS) after a job finishes, requiring the RDD to be recomputed once more.
Typical workflow: the driver calls rdd.checkpoint(), marks the RDD, and after the job ends Spark runs a separate job that writes the RDD’s partitions to the checkpoint directory.
During execution, Spark checks whether a partition has been checkpointed; if so, it reads the data from the checkpoint file via a special CheckpointRDD parent.
Unlike caching, checkpointing discards the lineage, allowing long lineages to be truncated and providing fault‑tolerant persistence.
Persisting with rdd.persist(StorageLevel.DISK_ONLY) differs from checkpointing: persisted data lives in the BlockManager and is removed when the executor shuts down, whereas checkpointed data remains on HDFS and can be reused by subsequent driver programs.
Discussion
Compared with Hadoop MapReduce, which repeatedly persists intermediate data and must recompute from scratch after failures, Spark’s cache and checkpoint mechanisms reduce redundant work. Users can manually checkpoint critical RDDs to avoid costly recomputation, accepting the overhead of an extra job.
Example code demonstrates setting a checkpoint directory, checkpointing an RDD, and performing a groupByKey operation.
rdd.iterator()
=> SparkEnv.get.cacheManager.getOrCompute(thisRDD, split, context, storageLevel)
=> key = RDDBlockId(rdd.id, split.index)
=> blockManager.get(key)
=> computedValues = rdd.computeOrReadCheckpoint(split, context)
if (isCheckpointed) firstParent[T].iterator(split, context)
else compute(split, context)
=> elements = new ArrayBuffer[Any]
=> elements ++= computedValues
=> updatedBlocks = blockManager.put(key, elements, tellMaster = true) val data1 = Array[(Int, Char)]((1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 'e'), (3, 'f'), (2, 'g'), (1, 'h'))
val pairs1 = sc.parallelize(data1, 3)
val data2 = Array[(Int, Char)]((1, 'A'), (2, 'B'), (3, 'C'), (4, 'D'))
val pairs2 = sc.parallelize(data2, 2)
pairs2.checkpoint
val result = pairs1.join(pairs2)
result.checkpoint package internals
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object groupByKeyTest {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("GroupByKey").setMaster("local")
val sc = new SparkContext(conf)
sc.setCheckpointDir("/Users/xulijie/Documents/data/checkpoint")
val data = Array[(Int, Char)]((1, 'a'), (2, 'b'),
(3, 'c'), (4, 'd'),
(5, 'e'), (3, 'f'),
(2, 'g'), (1, 'h')
)
val pairs = sc.parallelize(data, 3)
pairs.checkpoint
pairs.count
val result = pairs.groupByKey(2)
result.foreachWith(i => i)((x, i) => println("[PartitionIndex " + i + "] " + x))
println(result.toDebugString)
}
}Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
