Understanding Spark's BlockManager, MemoryStore, and DiskStore
This article explains Spark's storage architecture, detailing the roles and interactions of BlockManager, MemoryStore, and DiskStore, including their initialization, data management mechanisms, code implementations, and eviction strategies, to help readers grasp how Spark efficiently handles in‑memory and on‑disk data.
In previous lessons we covered Spark's RDDs and key components of the Spark system, as well as the crucial concept of Shuffle. This article focuses on Spark's storage subsystem, which is essential for both task submission and execution.
Spark avoids the I/O bottlenecks of Hadoop by storing configuration information and computation results primarily in memory, falling back to disk only when necessary. The storage system consists of several core modules: BlockManager: the central module of the storage system. DiskBlockManager: manages disk storage. MemoryStore: handles in‑memory storage. DiskStore: handles on‑disk storage.
BlockManager
BlockManager runs on every node (both Driver and Executors) and provides management of Blocks stored in local or remote memory, disk, and off‑heap memory. It works together with BlockInfoManager, DiskBlockManager, DiskStore, MemoryManager, MemoryStore, BlockManagerMaster, and BlockTransferService to manage the lifecycle of Blocks across the cluster.
The architecture follows a Master‑Slave model: the Driver hosts the BlockManagerMaster, while each Executor hosts a BlockManager. During SparkContext creation, SparkEnv.blockManager.initialize instantiates a BlockManager on the Driver, and each Executor also creates its own BlockManager.
Key initialization steps (simplified) are shown in the code snippet below:
def initialize(appId: String): Unit = {
// Initialize BlockTransferService (Netty implementation)
blockTransferService.init(this)
shuffleClient.init(appId)
// Set block replication policy
blockReplicationPolicy = {
val priorityClass = conf.get(
"spark.storage.replication.policy", classOf[RandomBlockReplicationPolicy].getName)
val clazz = Utils.classForName(priorityClass)
val ret = clazz.newInstance.asInstanceOf[BlockReplicationPolicy]
logInfo(s"Using $priorityClass for block replication policy")
ret
}
// Create BlockManagerId for this executor
val id = BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None)
// Register with BlockManagerMaster
val idFromMaster = master.registerBlockManager(id, maxMemory, slaveEndpoint)
blockManagerId = if (idFromMaster != null) idFromMaster else id
// Handle external shuffle service if enabled
shuffleServerId = if (externalShuffleServiceEnabled) {
logInfo(s"external shuffle service port = $externalShuffleServicePort")
BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
} else {
blockManagerId
}
if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
registerWithExternalShuffleServer()
}
logInfo(s"Initialized BlockManager: $blockManagerId")
}MemoryStore
MemoryStore stores Blocks in memory, improving read/write efficiency by avoiding disk I/O. It manages MemoryEntry objects, which can be either DeserializedMemoryEntry (non‑serialized data) or SerializedMemoryEntry (serialized data).
Internally, MemoryStore uses a LinkedHashMap of MemoryEntry objects, preserving insertion order and enabling an LRU (Least Recently Used) eviction policy. When memory is insufficient, blocks are evicted from the head of the map, and if the RDD's storage level permits, they are persisted to disk.
The definition of MemoryEntry is:
// Code location: org.apache.spark.storage.memory
private sealed trait MemoryEntry[T] {
// Block size
def size: Long
// Memory mode: ON_HEAP or OFF_HEAP
def memoryMode: MemoryMode
// Class tag of the data
def classTag: ClassTag[T]
}Eviction
When free memory is needed, Spark calls MemoryStore#evictBlocksToFreeSpace, which removes blocks from the head of the LRU list and optionally writes them to disk.
private[spark] def evictBlocksToFreeSpace(
blockId: Option[BlockId],
space: Long,
memoryMode: MemoryMode
): Long = { ... }DiskStore
DiskStore implements the block storage on disk, using DiskBlockManager to map Blocks to files. It provides methods such as getSize, contains, remove, putBytes, and getBytes for block manipulation.
Key fields of DiskStore include the Spark configuration, a reference to DiskBlockManager, and thresholds for memory‑mapped file reads.
private[spark] class DiskStore(
conf: SparkConf,
diskManager: DiskBlockManager,
securityManager: SecurityManager) extends Logging {
// Threshold for using memory‑mapped reads (default 2M)
private val minMemoryMapBytes = conf.get(config.STORAGE_MEMORY_MAP_THRESHOLD)
// Max threshold for memory‑mapped reads in tests
private val maxMemoryMapBytes = conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS)
// Mapping of BlockId to its size
private val blockSizes = new ConcurrentHashMap[BlockId, Long]()
...
}In summary, Spark's storage subsystem combines BlockManager, MemoryStore, and DiskStore to efficiently manage data in memory and on disk, using LRU eviction and configurable replication policies to balance performance and reliability.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
