Understanding Hadoop's Circular Buffer in the Shuffle Phase
This article explains how Hadoop's MapReduce shuffle uses a circular buffer to store serialized key/value pairs and their metadata, detailing its structure, initialization, write path, spill logic, and the background thread that sorts and writes data to disk.
During the MapReduce shuffle, Hadoop stores map output in an in‑memory circular buffer (also called a ring buffer) that holds serialized key/value data and accompanying metadata. The buffer is a simple FIFO array that can quickly detect full or empty states and is even implemented in hardware for high performance.
The circular buffer consists of a byte array kvbuffer for the actual data and an IntBuffer kvmeta that stores metadata for each record. Each record's metadata occupies four integers: value start offset, key start offset, partition, and value length. The buffer is divided by an equator where key/value data grows clockwise and metadata grows counter‑clockwise.
Initialization occurs in MapOutputBuffer.init. The size of the buffer is determined by the configuration property mapreduce.task.io.sort.mb (default 100 MiB). After allocating kvbuffer and wrapping it as kvmeta, the equator is set to zero and the various index pointers ( bufstart, bufend, kvstart, kvend) are initialized.
When a mapper calls NewOutputCollector.write, the data is handed to MapOutputBuffer.collect. The method first reserves space for metadata ( bufferRemaining -= METASIZE) and, if necessary, triggers a spill. It then serializes the key and value, writes them into kvbuffer via the write method, and finally records the metadata in kvmeta. The write method handles wrap‑around by copying data in two parts when bufindex + len > bufvoid.
public void write(byte b[], int off, int len) throws IOException {
bufferRemaining -= len;
if (bufferRemaining <= 0) {
// spill logic omitted for brevity
}
if (bufindex + len > bufvoid) {
int gaplen = bufvoid - bufindex;
System.arraycopy(b, off, kvbuffer, bufindex, gaplen);
len -= gaplen;
off += gaplen;
bufindex = 0;
}
System.arraycopy(b, off, kvbuffer, bufindex, len);
bufindex += len;
}If a key wraps around the end of the buffer, bb.shiftBufferedKey() is invoked to make the key contiguous. The method either copies the wrapped part to a temporary array or rearranges the buffer in place.
protected void shiftBufferedKey() throws IOException {
int headbytelen = bufvoid - bufmark;
bufvoid = bufmark;
int kvbidx = 4 * kvindex;
int kvbend = 4 * kvend;
int avail = Math.min(distanceTo(0, kvbidx), distanceTo(0, kvbend));
if (bufindex + headbytelen < avail) {
System.arraycopy(kvbuffer, 0, kvbuffer, headbytelen, bufindex);
System.arraycopy(kvbuffer, bufvoid, kvbuffer, 0, headbytelen);
bufindex += headbytelen;
bufferRemaining -= kvbuffer.length - bufvoid;
} else {
byte[] keytmp = new byte[bufindex];
System.arraycopy(kvbuffer, 0, keytmp, 0, bufindex);
bufindex = 0;
out.write(kvbuffer, bufmark, headbytelen);
out.write(keytmp);
}
}When the buffer reaches its soft limit, startSpill() is called. It records the current metadata and data boundaries ( kvend, bufend), sets spillInProgress = true, and signals the spill thread.
private void startSpill() {
kvend = (kvindex + NMETA) % kvmeta.capacity();
bufend = bufmark;
spillInProgress = true;
spillReady.signal();
}The equator is then recomputed to free space for further writes. The new position is calculated using the average record size and the remaining distance, ensuring at least half the buffer is reserved for metadata.
int avgRec = (int)(mapOutputByteCounter.getCounter() / mapOutputRecordCounter.getCounter());
int distkvi = distanceTo(bufindex, kvbidx);
int newPos = (bufindex + Math.max(2 * METASIZE - 1,
Math.min(distkvi / 2, distkvi / (METASIZE + avgRec) * METASIZE))) % kvbuffer.length;
setEquator(newPos);
bufmark = bufindex = newPos;
bufferRemaining = Math.min(distanceTo(bufend, newPos),
Math.min(distanceTo(newPos, 4 * kvend), softLimit)) - 2 * METASIZE;The background SpillThread runs continuously. When spillInProgress becomes true, the thread wakes, releases the lock, and executes sortAndSpill(). After spilling, it resets the start pointers and clears the flag.
public void run() {
spillLock.lock();
spillThreadRunning = true;
try {
while (true) {
spillDone.signal();
while (!spillInProgress) {
spillReady.await();
}
try {
spillLock.unlock();
sortAndSpill();
} finally {
spillLock.lock();
if (bufend < bufstart) bufvoid = kvbuffer.length;
kvstart = kvend;
bufstart = bufend;
spillInProgress = false;
}
}
} finally {
spillLock.unlock();
spillThreadRunning = false;
}
} sortAndSpill()computes the number of records to write, sorts the metadata by partition and key using the configured sorter, and then writes each partition to a spill file. If a combiner is configured, it is applied to the records before writing. Index information is cached in memory and flushed to an index file when the cache exceeds indexCacheMemoryLimit.
private void sortAndSpill() throws IOException, ClassNotFoundException, InterruptedException {
long size = distanceTo(bufstart, bufend, bufvoid) + partitions * APPROX_HEADER_LENGTH;
FSDataOutputStream out = null;
try {
SpillRecord spillRec = new SpillRecord(partitions);
Path filename = mapOutputFile.getSpillFileForWrite(numSpills, size);
out = rfs.create(filename);
// sort metadata
sorter.sort(this, mstart, mend, reporter);
// write each partition
for (int i = 0; i < partitions; ++i) {
// writer logic omitted for brevity
}
// handle index cache
if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillRec.writeToFile(indexFilename, job);
} else {
indexCacheList.add(spillRec);
totalIndexCacheMemory += spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
}
LOG.info("Finished spill " + numSpills);
++numSpills;
} finally {
if (out != null) out.close();
}
}In summary, Hadoop's circular buffer efficiently manages map output in memory, using a ring structure to separate data and metadata, dynamically adjusting the equator, and employing a dedicated spill thread to sort and persist data when memory thresholds are exceeded.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
