Backend Development 29 min read

Understanding Node.js Readable Streams: Internals, API, and Implementation

This article explains the fundamentals and internal mechanics of Node.js readable streams, covering their history, data structures like BufferList, state management, key APIs such as push, read, pipe, and how they inherit from EventEmitter to enable efficient, low‑memory data processing.

Tencent IMWeb Frontend Team
Tencent IMWeb Frontend Team
Tencent IMWeb Frontend Team
Understanding Node.js Readable Streams: Internals, API, and Implementation

1. Basic Concepts

1.1 History of Streams

Streams are not unique to Node.js; they originated decades ago in Unix operating systems, where programs can interact via the pipe operator (|).

Both macOS and Linux, which are Unix‑based, support the pipe operator, allowing the output of the left‑hand process to become the input of the right‑hand process.

In Node, using the traditional readFile reads the entire file into memory before any processing can occur.

This approach has two drawbacks:

Memory consumption: large amount of memory is used.

Time: processing starts only after the whole payload has been loaded.

To solve these problems, Node.js adopted the stream concept. In Node.js streams there are four types, all of which are instances of EventEmitter:

Readable Stream

Writable Stream

Duplex Stream (readable & writable)

Transform Stream

To study this part gradually, I will start with readable streams.

1.2 What Is a Stream?

A stream is an abstract data structure that represents a collection of data. For

objectMode === false

the only allowed data types are

string

and

Buffer

.

We can think of a stream as a container of these data pieces, like a liquid stored in a buffer (BufferList). When the appropriate event fires, the liquid is poured into a pipe and the other side can retrieve it for processing.

1.3 What Is a Readable Stream?

A readable stream is a type of stream that has two reading modes and three states.

Two reading modes:

Flowing mode: data is read from the underlying system and emitted to registered event handlers as quickly as possible.

Paused mode: no data is read; the Stream.read() method must be called explicitly to pull data.

Three states (

readableFlowing

):

readableFlowing === null – no data is produced; calling Stream.pipe() or Stream.resume() changes it to true , starting data production and event emission.

readableFlowing === false – data flow is paused, but data generation continues, causing back‑pressure.

readableFlowing === true – data is produced and consumed normally.

2. Basic Principles

2.1 Internal State Definition (ReadableState)

ReadableState

<code>_readableState: ReadableState {
  objectMode: false, // set true to handle non‑string/Buffer/null types
  highWaterMark: 16384, // default 16 KB, stops calling _read() when exceeded
  buffer: BufferList { head: null, tail: null, length: 0 }, // internal buffer list
  length: 0, // total size of data in the readable stream (objectMode ? buffer.length : byte length)
  pipes: [], // list of piped destinations
  flowing: null, // null / false / true
  ended: false, // true when all data has been consumed
  endEmitted: false,
  reading: false,
  constructed: true,
  sync: true,
  needReadable: false,
  emittedReadable: false,
  readableListening: false,
  resumeScheduled: false,
  errorEmitted: false,
  emitClose: true,
  autoDestroy: true,
  destroyed: false,
  errored: null,
  closed: false,
  closeEmitted: false,
  defaultEncoding: 'utf8',
  awaitDrainWriters: null,
  multiAwaitDrain: false,
  readingMore: false,
  dataEmitted: false,
  decoder: null,
  encoding: null,
  [Symbol(kPaused)]: null
}</code>

2.2 Internal Data Storage Implementation (BufferList)

BufferList is the container used by streams to store internal data. It is implemented as a linked list with three properties:

head

,

tail

, and

length

.

Each node in BufferList is a BufferNode; the type of its data depends on

objectMode

.

This structure provides faster head‑access than

Array.prototype.shift()

.

2.2.1 Data Storage Types

If

objectMode === true

:

Data can be of any type; whatever is pushed is stored as‑is.

objectMode = true

<code>const Stream = require('stream');
const readableStream = new Stream.Readable({
  objectMode: true,
  read() {}
});

readableStream.push({ name: 'lisa' });
console.log(readableStream._readableState.buffer.tail);
readableStream.push(true);
console.log(readableStream._readableState.buffer.tail);
readableStream.push('lisa');
console.log(readableStream._readableState.buffer.tail);
readableStream.push(666);
console.log(readableStream._readableState.buffer.tail);
readableStream.push(() => {});
console.log(readableStream._readableState.buffer.tail);
readableStream.push(Symbol(1));
console.log(readableStream._readableState.buffer.tail);
readableStream.push(BigInt(123));
console.log(readableStream._readableState.buffer.tail);
</code>

Result shown in the following image:

If

objectMode === false

:

Data can only be

string

,

Buffer

, or

Uint8Array

.

objectMode = false

<code>const Stream = require('stream');
const readableStream = new Stream.Readable({
  objectMode: false,
  read() {}
});
readableStream.push({ name: 'lisa' });
</code>

Result shown in the following image:

2.2.2 Data Storage Structure

When we create a readable stream in the Node REPL and push data, we first need to implement the

_read

method or provide a

read

function in the constructor.

<code>const Stream = require('stream');
const readableStream = new Stream.Readable();
RS._read = function(size) {};
</code>

or

<code>const Stream = require('stream');
const readableStream = new Stream.Readable({
  read(size) {}
});
</code>

After

readableStream.push('abc')

the buffer contains two nodes whose data are the ASCII codes of the string 'abc', stored as Buffer objects. The

length

property reflects the number of stored chunks, not the byte size.

2.2.3 Related API

Printing all BufferList methods yields the following image (omitted for brevity). Apart from

join

(which serialises the list to a string), the other methods perform data access operations. Below are the most relevant ones.

2.2.3.1 consume

Source: BufferList.consume

<code>// Consumes a specified amount of bytes or characters from the buffered data.
consume(n, hasStrings) {
  const data = this.head.data;
  if (n < data.length) {
    // slice works for both buffers and strings.
    const slice = data.slice(0, n);
    this.head.data = data.slice(n);
    return slice;
  }
  if (n === data.length) {
    // First chunk is a perfect match.
    return this.shift();
  }
  // Result spans more than one buffer.
  return hasStrings ? this._getString(n) : this._getBuffer(n);
}
</code>

The function has three branches:

If the requested length is smaller than the head node, it returns the first

n

bytes and updates the head.

If the requested length equals the head node length, it returns the whole head node.

If the requested length is larger, it delegates to

_getString

or

_getBuffer

depending on the underlying storage type.

2.2.3.2 _getBuffer

Source: BufferList._getBuffer

<code>// Consumes a specified amount of bytes from the buffered data.
_getBuffer(n) {
  const ret = Buffer.allocUnsafe(n);
  const retLen = n;
  let p = this.head;
  let c = 0;
  do {
    const buf = p.data;
    if (n > buf.length) {
      TypedArrayPrototypeSet(ret, buf, retLen - n);
      n -= buf.length;
    } else {
      if (n === buf.length) {
        TypedArrayPrototypeSet(ret, buf, retLen - n);
        ++c;
        if (p.next)
          this.head = p.next;
        else
          this.head = this.tail = null;
      } else {
        TypedArrayPrototypeSet(ret,
          new Uint8Array(buf.buffer, buf.byteOffset, n),
          retLen - n);
        this.head = p;
        p.data = buf.slice(n);
      }
      break;
    }
    ++c;
  } while ((p = p.next) !== null);
  this.length -= c;
  return ret;
}
</code>

It iterates over the linked list, copying data into a newly allocated Buffer until the requested length is satisfied.

2.2.3.3 _getString

Source: BufferList._getString

<code>// Consumes a specified amount of characters from the buffered data.
_getString(n) {
  let ret = '';
  let p = this.head;
  let c = 0;
  do {
    const str = p.data;
    if (n > str.length) {
      ret += str;
      n -= str.length;
    } else {
      if (n === str.length) {
        ret += str;
        ++c;
        if (p.next)
          this.head = p.next;
        else
          this.head = this.tail = null;
      } else {
        ret += StringPrototypeSlice(str, 0, n);
        this.head = p;
        p.data = StringPrototypeSlice(str, n);
      }
      break;
    }
    ++c;
  } while ((p = p.next) !== null);
  this.length -= c;
  return ret;
}
</code>

The logic mirrors

_getBuffer

but works with strings.

2.3 Why Is a Readable Stream an Instance of EventEmitter?

Understanding the publish‑subscribe pattern is essential: callbacks are stored in a queue and invoked later, separating producers from consumers. Node.js streams follow this pattern, which is why they inherit from EventEmitter.

The inheritance chain is built by first making the Stream constructor inherit from EventEmitter, then making Readable inherit from Stream:

<code>function Stream(opts) {
  EE.call(this, opts);
}
ObjectSetPrototypeOf(Stream.prototype, EE.prototype);
ObjectSetPrototypeOf(Stream, EE);
</code>

Later, Readable’s prototype is set to Stream’s prototype:

<code>ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
ObjectSetPrototypeOf(Readable, Stream);
</code>

Thus:

Readable.prototype.__proto__ === Stream.prototype

Stream.prototype.__proto__ === EE.prototype

Readable.prototype.__proto__.__proto__ === EE.prototype

Traversing this chain shows that Readable streams inherit all EventEmitter methods.

2.4 Implementation of Core APIs

Below are the most important methods of a readable stream, shown in the order they appear in the source.

2.4.1 push

push adds a chunk to the internal buffer or emits it directly when the stream is flowing.

<code>Readable.prototype.push = function(chunk, encoding) {
  return readableAddChunk(this, chunk, encoding, false);
};
</code>

The pseudo‑code for

readableAddChunk

highlights the main flow:

<code>function readableAddChunk(stream, chunk, encoding, addToFront) {
  const state = stream._readableState;
  if (chunk === null) { // end of stream
    state.reading = false;
    onEofChunk(stream, state);
  } else if (!state.objectMode) {
    if (typeof chunk === 'string') {
      chunk = Buffer.from(chunk);
    } else if (chunk instanceof Buffer) {
      // handle encoding …
    } else if (Stream._isUint8Array(chunk)) {
      chunk = Stream._uint8ArrayToBuffer(chunk);
    } else if (chunk != null) {
      err = new ERR_INVALID_ARG_TYPE('chunk', ['string','Buffer','Uint8Array'], chunk);
    }
  }
  if (state.objectMode || (chunk && chunk.length > 0)) {
    // insert chunk into buffer
    addChunk(stream, state, chunk, true);
  }
}
</code>

Key points:

When

objectMode

is false, the chunk is converted to a Buffer.

When

objectMode

is true, the chunk is passed through unchanged.

If the stream is flowing and there are data listeners, the chunk is emitted immediately; otherwise it is stored in the buffer.

2.4.2 read

read first calls the user‑implemented

_read

method (or a Promise‑based version) and then extracts data from the internal buffer.

<code>// Adjust highWaterMark if needed
if (n > state.highWaterMark) {
  state.highWaterMark = computeNewHighWaterMark(n);
}
try {
  const result = this._read(state.highWaterMark);
  if (result != null) {
    const then = result.then;
    if (typeof then === 'function') {
      then.call(result, nop, function(err) {
        errorOrDestroy(this, err);
      });
    }
  }
} catch (err) {
  errorOrDestroy(this, err);
}
</code>

The core extraction logic lives in

fromList

:

<code>function fromList(n, state) {
  if (state.length === 0) return null;
  let ret;
  if (state.objectMode)
    ret = state.buffer.shift();
  else if (!n || n >= state.length) {
    // read everything
    if (state.decoder)
      ret = state.buffer.join('');
    else if (state.buffer.length === 1)
      ret = state.buffer.first();
    else
      ret = state.buffer.concat(state.length);
    state.buffer.clear();
  } else {
    // read n bytes
    ret = state.buffer.consume(n, state.decoder);
  }
  return ret;
}
</code>

2.4.3 _read

The user must provide

_read

. Inside it they typically call

push

repeatedly and finally push

null

to signal the end of the stream.

<code>const Stream = require('stream');
const readableStream = new Stream.Readable({
  read(hwm) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 122) {
      this.push(null);
    }
  },
});
readableStream.currentCharCode = 97;
readableStream.pipe(process.stdout);
// outputs: abcdefghijklmnopqrstuvwxyz
</code>

2.4.4 pipe (important)

pipe attaches one or more writable streams to the readable stream and switches the readable into flowing mode.

<code>Readable.prototype.pipe = function(dest, pipeOpts) {
  const src = this;
  const state = this._readableState;
  state.pipes.push(dest);
  src.on('data', ondata);
  function ondata(chunk) {
    const ret = dest.write(chunk);
    if (ret === false) {
      pause();
    }
  }
  dest.emit('pipe', src);
  if (dest.writableNeedDrain === true) {
    if (state.flowing) {
      pause();
    }
  } else if (!state.flowing) {
    src.resume();
  }
  return dest;
};
</code>

It works similarly to the Unix pipe operator, moving data from the left side to the right side.

2.4.5 resume

resume switches a stream from paused to flowing mode unless a 'readable' listener is present.

<code>Readable.prototype.resume = function() {
  const state = this._readableState;
  if (!state.flowing) {
    state.flowing = !state.readableListening;
    resume(this, state);
  }
};
function resume(stream, state) {
  if (!state.resumeScheduled) {
    state.resumeScheduled = true;
    process.nextTick(resume_, stream, state);
  }
}
function resume_(stream, state) {
  if (!state.reading) {
    stream.read(0);
  }
  state.resumeScheduled = false;
  stream.emit('resume');
  flow(stream);
}
function flow(stream) {
  const state = stream._readableState;
  while (state.flowing && stream.read() !== null);
}
</code>

2.4.6 pause

pause stops the 'data' events and keeps incoming data in the internal buffer.

<code>Readable.prototype.pause = function() {
  if (this._readableState.flowing !== false) {
    this._readableState.flowing = false;
    this.emit('pause');
  }
  return this;
};
</code>

2.5 Usage and Working Mechanism

2.5.1 Working Mechanism

The diagram shows the overall flow and the conditions that trigger mode transitions for a readable stream.

needReadable === true

: paused mode, buffer size ≤ highWaterMark, a 'readable' listener is attached, or a read request finds no data.

push

: in flowing mode emits 'data' immediately; otherwise stores data and may emit 'readable' when needed.

read

: reading with length 0 triggers 'readable' when the buffer reaches or exceeds the highWaterMark; otherwise consumes data and emits 'data'.

resume

: has no effect if a 'readable' listener exists; otherwise switches to flowing mode and drains the buffer.

readable

is emitted when a listener is attached and data is available, when

push

adds data while

needReadable

is true, or when a zero‑length read finds sufficient buffered data.

3. Summary

Node.js implements its own stream system to avoid memory and latency problems by processing data in small chunks.

Streams are not exclusive to Node.js; they originated in Unix decades ago.

There are four stream types—readable, writable, duplex, and transform—all inheriting from EventEmitter.

The underlying container is a BufferList, a custom linked‑list structure with head and tail pointers.

Readable streams have two modes (flowing and paused) and three internal states governing data production.

Using streams enables chainable data processing and flexible composition of transformation steps.

Node.jsStreamsEventEmitterReadableStreamBufferList
Tencent IMWeb Frontend Team
Written by

Tencent IMWeb Frontend Team

IMWeb Frontend Community gathering frontend development enthusiasts. Follow us for refined live courses by top experts, cutting‑edge technical posts, and to sharpen your frontend skills.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.