Big Data 24 min read

Understanding Hadoop's Circular Buffer in the Shuffle Phase

This article explains how Hadoop's MapReduce shuffle uses a circular buffer to store serialized key/value pairs and their metadata, detailing its structure, initialization, write path, spill logic, and the background thread that sorts and writes data to disk.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Understanding Hadoop's Circular Buffer in the Shuffle Phase

During the MapReduce shuffle, Hadoop stores map output in an in‑memory circular buffer (also called a ring buffer) that holds serialized key/value data and accompanying metadata. The buffer is a simple FIFO array that can quickly detect full or empty states and is even implemented in hardware for high performance.

The circular buffer consists of a byte array kvbuffer for the actual data and an IntBuffer kvmeta that stores metadata for each record. Each record's metadata occupies four integers: value start offset, key start offset, partition, and value length. The buffer is divided by an equator where key/value data grows clockwise and metadata grows counter‑clockwise.

Initialization occurs in MapOutputBuffer.init. The size of the buffer is determined by the configuration property mapreduce.task.io.sort.mb (default 100 MiB). After allocating kvbuffer and wrapping it as kvmeta, the equator is set to zero and the various index pointers ( bufstart, bufend, kvstart, kvend) are initialized.

When a mapper calls NewOutputCollector.write, the data is handed to MapOutputBuffer.collect. The method first reserves space for metadata ( bufferRemaining -= METASIZE) and, if necessary, triggers a spill. It then serializes the key and value, writes them into kvbuffer via the write method, and finally records the metadata in kvmeta. The write method handles wrap‑around by copying data in two parts when bufindex + len > bufvoid.

public void write(byte b[], int off, int len) throws IOException {
  bufferRemaining -= len;
  if (bufferRemaining <= 0) {
    // spill logic omitted for brevity
  }
  if (bufindex + len > bufvoid) {
    int gaplen = bufvoid - bufindex;
    System.arraycopy(b, off, kvbuffer, bufindex, gaplen);
    len -= gaplen;
    off += gaplen;
    bufindex = 0;
  }
  System.arraycopy(b, off, kvbuffer, bufindex, len);
  bufindex += len;
}

If a key wraps around the end of the buffer, bb.shiftBufferedKey() is invoked to make the key contiguous. The method either copies the wrapped part to a temporary array or rearranges the buffer in place.

protected void shiftBufferedKey() throws IOException {
  int headbytelen = bufvoid - bufmark;
  bufvoid = bufmark;
  int kvbidx = 4 * kvindex;
  int kvbend = 4 * kvend;
  int avail = Math.min(distanceTo(0, kvbidx), distanceTo(0, kvbend));
  if (bufindex + headbytelen < avail) {
    System.arraycopy(kvbuffer, 0, kvbuffer, headbytelen, bufindex);
    System.arraycopy(kvbuffer, bufvoid, kvbuffer, 0, headbytelen);
    bufindex += headbytelen;
    bufferRemaining -= kvbuffer.length - bufvoid;
  } else {
    byte[] keytmp = new byte[bufindex];
    System.arraycopy(kvbuffer, 0, keytmp, 0, bufindex);
    bufindex = 0;
    out.write(kvbuffer, bufmark, headbytelen);
    out.write(keytmp);
  }
}

When the buffer reaches its soft limit, startSpill() is called. It records the current metadata and data boundaries ( kvend, bufend), sets spillInProgress = true, and signals the spill thread.

private void startSpill() {
  kvend = (kvindex + NMETA) % kvmeta.capacity();
  bufend = bufmark;
  spillInProgress = true;
  spillReady.signal();
}

The equator is then recomputed to free space for further writes. The new position is calculated using the average record size and the remaining distance, ensuring at least half the buffer is reserved for metadata.

int avgRec = (int)(mapOutputByteCounter.getCounter() / mapOutputRecordCounter.getCounter());
int distkvi = distanceTo(bufindex, kvbidx);
int newPos = (bufindex + Math.max(2 * METASIZE - 1,
    Math.min(distkvi / 2, distkvi / (METASIZE + avgRec) * METASIZE))) % kvbuffer.length;
setEquator(newPos);
bufmark = bufindex = newPos;
bufferRemaining = Math.min(distanceTo(bufend, newPos),
    Math.min(distanceTo(newPos, 4 * kvend), softLimit)) - 2 * METASIZE;

The background SpillThread runs continuously. When spillInProgress becomes true, the thread wakes, releases the lock, and executes sortAndSpill(). After spilling, it resets the start pointers and clears the flag.

public void run() {
  spillLock.lock();
  spillThreadRunning = true;
  try {
    while (true) {
      spillDone.signal();
      while (!spillInProgress) {
        spillReady.await();
      }
      try {
        spillLock.unlock();
        sortAndSpill();
      } finally {
        spillLock.lock();
        if (bufend < bufstart) bufvoid = kvbuffer.length;
        kvstart = kvend;
        bufstart = bufend;
        spillInProgress = false;
      }
    }
  } finally {
    spillLock.unlock();
    spillThreadRunning = false;
  }
}
sortAndSpill()

computes the number of records to write, sorts the metadata by partition and key using the configured sorter, and then writes each partition to a spill file. If a combiner is configured, it is applied to the records before writing. Index information is cached in memory and flushed to an index file when the cache exceeds indexCacheMemoryLimit.

private void sortAndSpill() throws IOException, ClassNotFoundException, InterruptedException {
  long size = distanceTo(bufstart, bufend, bufvoid) + partitions * APPROX_HEADER_LENGTH;
  FSDataOutputStream out = null;
  try {
    SpillRecord spillRec = new SpillRecord(partitions);
    Path filename = mapOutputFile.getSpillFileForWrite(numSpills, size);
    out = rfs.create(filename);
    // sort metadata
    sorter.sort(this, mstart, mend, reporter);
    // write each partition
    for (int i = 0; i < partitions; ++i) {
      // writer logic omitted for brevity
    }
    // handle index cache
    if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
      Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
      spillRec.writeToFile(indexFilename, job);
    } else {
      indexCacheList.add(spillRec);
      totalIndexCacheMemory += spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
    }
    LOG.info("Finished spill " + numSpills);
    ++numSpills;
  } finally {
    if (out != null) out.close();
  }
}

In summary, Hadoop's circular buffer efficiently manages map output in memory, using a ring structure to separate data and metadata, dynamically adjusting the equator, and employing a dedicated spill thread to sort and persist data when memory thresholds are exceeded.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaBig DataMapReduceHadoopShufflecircular bufferSpill
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.