Build a Simple MQTT Client with Node.js Streams: Step‑by‑Step Guide

This article explains the MQTT protocol, demonstrates why streams are ideal for protocol implementation, and provides a complete Node.js example—including connection, subscription, publishing, heartbeat, and graceful shutdown—illustrated with clear code snippets and diagrams.

Goodme Frontend Team
Goodme Frontend Team
Goodme Frontend Team
Build a Simple MQTT Client with Node.js Streams: Step‑by‑Step Guide

Introduction

MQTT (Message Queuing Telemetry Transport) is a lightweight publish/subscribe messaging protocol widely used in IoT and other scenarios that require reliable message delivery, such as device‑to‑device communication, real‑time messaging, and traffic management.

Human sensor -> MQTT Broker -> subscriber (e.g., smart bulb)
User A sends a message -> MQTT Broker -> User B receives the message
Vehicle A publishes brake warning -> MQTT Broker -> Vehicles B, C receive the warning

Studying MQTT helps you understand protocol design and also deepens your knowledge of other TCP‑based protocols like HTTP and WebSocket.

Streams

What Is a Stream?

A stream processes data piece by piece as it becomes available, instead of loading the entire dataset into memory. Imagine moving water between two pools: the traditional method uses buckets (load all data at once), while a stream is like a pipe that continuously transfers water.

Stream analogy
Stream analogy

Code example using the bucket analogy:

const fs = require('fs');

fs.readFile('poolA.txt', 'utf8', (err, data) => {
  if (err) {
    console.error(err);
    return;
  }
  // "data" is the bucket, its contents are the water
  // In real scenarios you would check the size and split it if needed
  fs.writeFile('poolB.txt', data, (err) => {
    if (err) {
      console.error(err);
      return;
    }
  });
});

Using a pipe (the water pipe) simplifies the process:

var fs = require('fs');

var readable = fs.createReadStream('poolA.txt');
var writable = fs.createWriteStream('poolB.txt');

readable.pipe(writable); // pipe works like a water pipe

Conversion diagram:

Data --> Water
  Memory --> Pool
  Stream --> Pipe

When a pipe connects two pools, water flows automatically, and the pipe can also control flow rate.

Pipe flow
Pipe flow
var fs = require('fs');

var readable = fs.createReadStream('poolA.txt');
var writable = fs.createWriteStream('poolB.txt');

readable.pipe(writable); // pipe works like a water pipe

Why Use Streams for Protocol Implementation?

From Stream Characteristics

Memory Efficiency : Streams process large files or data in small chunks, avoiding the need to load everything into memory.

Back‑pressure Control : When the consumer is slower than the producer, data is buffered and the producer pauses.

Pipe Operations : Streams can be connected via pipe, e.g., a readable stream directly piped to a writable stream.

From Open‑Source Library Usage

Node.js http library is built on streams.

// initiate connection
  if (this.agent) {
    this.agent.addRequest(this, optsWithoutSignal);
  } else {
    // No agent, default to Connection:close.
    this._last = true;
    this.shouldKeepAlive = false;
    let opts = optsWithoutSignal;
    if (opts.path || opts.socketPath) {
      opts = { ...optsWithoutSignal };
      if (opts.socketPath) {
        opts.path = opts.socketPath;
      } else if (opts.path) {
        opts.path = undefined;
      }
    }
    if (typeof opts.createConnection === 'function') {
      const oncreate = once((err, socket) => {
        if (err) {
          process.nextTick(() => this.emit('error', err));
        } else {
          this.onSocket(socket);
        }
      });
      try {
        const newSocket = opts.createConnection(opts, oncreate);
        if (newSocket) {
          oncreate(null, newSocket);
        }
      } catch (err) {
        oncreate(err);
      }
    } else {
      debug('CLIENT use net.createConnection', opts);
      // use net.createConnection to create a connection data stream
      this.onSocket(net.createConnection(opts));
    }
  }

Node‑Redis: implements the Redis protocol and provides a high‑performance Node.js client.

#createNetSocket(): CreateSocketReturn<net.Socket> {
        return {
            connectEvent: 'connect',
            // use net.connect to create a connection data stream
            socket: net.connect(this.#options as net.NetConnectOpts) // TODO
        };
    }

    #createTlsSocket(): CreateSocketReturn<tls.TLSSocket> {
        return {
            connectEvent: 'secureConnect',
            // use tls.connect to create a connection encrypted data stream
            socket: tls.connect(this.#options as tls.ConnectionOptions) // TODO
        };
    }

Mysql2: provides a MySQL protocol implementation for Node.js.

if (!opts.config.stream) {
  if (opts.config.socketPath) {
    this.stream = Net.connect(opts.config.socketPath);
  } else {
    this.stream = Net.connect(
      opts.config.port,
      opts.config.host
    );
    // Optionally enable keep-alive on the socket.
    if (this.config.enableKeepAlive) {
      this.stream.on('connect', () => {
        this.stream.setKeepAlive(true, this.config.keepAliveInitialDelay);
      });
    }
    // Enable TCP_NODELAY flag so packets are sent immediately.
    this.stream.setNoDelay(true);
  }
} else if (typeof opts.config.stream === 'function') {
  this.stream = opts.config.stream(opts);
} else {
  this.stream = opts.config.stream;
}

MQTT Protocol

What Is a Protocol?

A protocol is a set of rules that define how two or more devices communicate, including encoding, transmission, and error handling. Examples include HTTP, TCP, and UDP.

What Is MQTT?

MQTT (Message Queuing Telemetry Transport) is a TCP/IP‑based (or WebSocket‑based for some mini‑programs) publish/subscribe protocol. The official documentation is available in English and Chinese.

The overall client‑server interaction flow is shown below:

MQTT interaction flow
MQTT interaction flow

Design Approach

Based on the previous sections, we know the MQTT stages and the definition of streams. Before designing the protocol, we first create a simple code model to show how streams can automatically handle data processing on both client and server sides, achieving asynchronous non‑blocking parsing.

let completeParse = null

const writable = new Writable({
  write(buf, _, done) {
    console.log('writable stream :: parse buffer, buffer content: ', buf)
    // simulate packet parsing
    console.log('parser :: parsed data:', buf.toString())
    // cache to queue
    packets.push(buf.toString())

    // done indicates one frame has been processed
    completeParse = done
    work()
  },
})

We then drive the next frame using callbacks:

const nextTickWork = () => {
  if (packets.length) {
    // if there is more data, schedule work before the next event loop
    process.nextTick(work)
  } else {
    const done = completeParse
    completeParse = null
    console.log('nextTickWork :: done')
    if (done) done()
  }
}

const work = () => {
  const packet = packets.shift()
  console.log('work :: get next packet from queue: ', packet)
  if (packet) {
    // simulate processing packet and then call nextTickWork
    console.log('process packet:', packet)
    nextTickWork()
  } else {
    console.log('work :: no data in queue')
    const done = completeParse
    completeParse = null
    if (done) done()
  }
}

The complete model code:

import fs from 'fs'
import path from 'path'
import { Writable } from 'stream'

const packets = []

const nextTickWork = () => {
  if (packets.length) {
    // schedule work before next event loop
    process.nextTick(work)
  } else {
    const done = completeParse
    completeParse = null
    console.log('nextTickWork :: done')
    if (done) done()
  }
}

const work = () => {
  const packet = packets.shift()
  console.log('work :: get next packet from queue: ', packet)
  if (packet) {
    // simulate processing packet
    console.log('process packet:', packet)
    nextTickWork()
  } else {
    console.log('work :: no data in queue')
    const done = completeParse
    completeParse = null
    if (done) done()
  }
}

let completeParse = null

const writable = new Writable({
  write(buf, _, done) {
    console.log('writable stream :: parse buffer, buffer content: ', buf)
    console.log('parser :: parsed data:', buf.toString())
    packets.push(buf.toString())
    completeParse = done
    work()
  },
})

// simulate reading a file and piping it into the writable stream
const readable = fs.createReadStream(path.join(__dirname, '1.txt'), {
  highWaterMark: 1, // internal buffer size, default 64 * 1024, here set to 1KB
})
readable.pipe(writable)

Connection Phase

Bit‑Stream Data Parsing and Generation

Data is transmitted as Buffers, so we must convert commands and payloads to Buffers when sending, and convert received Buffers back to readable text. For this tutorial we rely on an open‑source library to handle the low‑level packing and parsing.

Generating Buffer Data

const mqtt = require('mqtt-packet')
// command example – see protocol docs for details
const object = {
  cmd: 'publish',
  retain: false,
  qos: 0,
  dup: false,
  length: 10,
  topic: 'test',
  payload: 'test',
}
const opts = { protocolVersion: 4 } // we implement version 4
console.log(mqtt.generate(object))

Parsing Buffer Data

const mqtt = require('mqtt-packet')
const opts = { protocolVersion: 4 }
const parser = mqtt.parser(opts)
parser.on('packet', packet => {
  console.log(packet)
})

Establishing the TCP Connection

Initiating TCP Connection

private createConnectionStream = (opts) => {
  // default port
  opts.port = opts.port || 1883
  // default host
  opts.hostname = opts.hostname || opts.host || 'localhost'

  const { port } = opts
  const host = opts.hostname

  this.log('Port %d and host %s', port, host)
  // create TCP connection
  return net.createConnection(port, host)
}

Building the Data Stream

After the TCP connection is established we construct the data Buffer stream. Instead of manually calling mqtt.generate, we use mqttPacket.writeToStream which both packs the data and writes it to the stream.

private _writePacket(packet, cb) {
  this.log('_writePacket :: packet to send: %O', packet)
  this.log('_writePacket :: sending packet')
  this.log('_writePacket :: emit packetsend')
  this.emit('packetsend', packet)
  // write packet to stream
  const result = mqttPacket.writeToStream(
    packet,
    this.stream,
    this.options,
  )
  this.log('_writePacket :: write result: %s', result)
}

Performing the Connection

connect() {
  // subscribe parser to packet events
  const parser = mqttPacket.parser(this.options)
  // create the TCP stream
  this.stream = this.createConnectionStream(this.options)
  let completeParse = null

  const work = () => {
    const packet = packets.shift()
    this.log('work :: get next packet: ', packet)
    if (packet) {
      this.log('work :: processing packet')
      this._handlePacket(packet)
    } else {
      this.log('work :: no data in queue')
    }
  }

  const writable = new Writable()
  // writable._write is called when data arrives from the server
  writable._write = (buf, _, done) => {
    this.log('writable stream :: parse buffer: ', buf)
    parser.parse(buf)
    completeParse = done
    work()
  }

  // pipe TCP stream into writable for parsing
  this.stream.pipe(writable)

  const packets = []
  parser.on('packet', packet => {
    this.log('parser :: push packet to queue: %s', packet)
    packets.push(packet)
  })

  // send CONNECT packet
  const connectPacket = {
    cmd: 'connect',
    protocolId: this.options.protocolId,
    protocolVersion: this.options.protocolVersion,
    clean: this.options.clean,
    clientId: this.options.clientId,
    keepalive: this.options.keepalive,
  }
  this._writePacket(connectPacket)
}

Packet Handling Core

private _handlePacket(packet) {
  const { cmd } = packet
  this.log('_handlePacket :: handling command: %s', cmd)
  switch (cmd) {
    case 'connack':
      // emit connect event for external listeners
      this.emit('connect', packet)
      break
    default:
      this.log('_handlePacket :: unknown command: %s', cmd)
      this._noop()
  }
}

Topic Subscription Phase

After a successful connection we can subscribe to topics. A topic is analogous to a conversation channel (e.g., "technology", "politics", "life").

Subscribe Implementation

public subscribe(topic, callback) {
  callback = callback || this.noop
  const defaultOpts = { qos: 0 }
  const subs = [{ topic, qos: defaultOpts.qos }]
  const packet = {
    cmd: 'subscribe',
    subscriptions: subs,
    messageId: this._nextId(),
  }
  this._writePacket(packet)
  callback(null, subs)
  return this
}

Handling Subscription Response

private _handlePacket(packet, done) {
  const { cmd } = packet
  this.log('_handlePacket :: handling command: %s', cmd)
  switch (cmd) {
    case 'connack':
      this.emit('connect', packet)
      done()
      break
    case 'suback':
      if (!packet.granted) {
        this.log('suback :: subscription failed')
        this.emit('error', packet)
      } else {
        this.log('suback :: subscription succeeded')
      }
      done()
      break
    default:
      const msg = `_handlePacket :: unknown command: ${cmd}`
      this.log(msg)
      this.emit('error', msg)
      done()
  }
}

Message Publishing Phase

Publish Implementation

public publish(topic, message, callback) {
  this.log('publish :: send message "%s" to topic "%s"', message, topic)
  const packet = {
    cmd: 'publish',
    topic,
    payload: message,
    qos: defaultQOS,
    dup: false,
    retain: false,
  }
  this._writePacket(packet)
  return this
}

Handling Publish Response

private _handlePacket(packet, done) {
  const { cmd } = packet
  this.log('_handlePacket :: handling command: %s', cmd)
  switch (cmd) {
    case 'publish':
      const topic = packet.topic.toString()
      const message = packet.payload
      this.emit('message', topic, message as Buffer, packet)
      done()
      break
    // other cases omitted for brevity
    default:
      const msg = `_handlePacket :: unknown command: ${cmd}`
      this.log(msg)
      this.emit('error', msg)
      done()
  }
}

Heartbeat (Ping) Phase

The client and server keep the connection alive by periodically sending a ping.

Ping Check

private _checkPing() {
  this.log('_checkPing :: checking heartbeat...')
  if (this.pingResp) {
    this.log('_checkPing :: ping response received, sending pingreq')
    this.pingResp = false
    this._writePacket({ cmd: 'pingreq' })
  } else {
    this.emit('error', new Error('Keepalive timeout'))
    this.pingTimer = null
  }
}

private _cleanPingTimer = () => {
  clearTimeout(this.pingTimer)
  this.pingTimer = null
  this._reschedule()
}

private _reschedule() {
  clearTimeout(this.pingTimer)
  this.pingTimer = setTimeout(() => {
    this._checkPing()
    if (this.pingTimer) {
      this._cleanPingTimer()
    }
  }, this.options.keepalive * 1000)
}

private _setupPingTimer() {
  this.log('_setupPingTimer :: keepalive %d seconds', this.options.keepalive)
  if (!this.pingTimer && this.options.keepalive) {
    this.pingResp = true
    this._reschedule()
  }
}

Handling Ping Response

private _handlePacket(packet, done) {
  const { cmd } = packet
  this.log('_handlePacket :: handling command: %s', cmd)
  switch (cmd) {
    case 'pingresp':
      this.pingResp = true
      this.log('pingresp :: received PINGRESP')
      done()
      break
    // other cases omitted for brevity
  }
}

Disconnect Phase

Initiating Disconnect

end() {
  this.log('end :: closing connection')
  // stop heartbeat timer
  if (this.pingTimer) {
    clearTimeout(this.pingTimer)
    this.pingTimer = null
  }
  // send DISCONNECT packet
  const packet = { cmd: 'disconnect' }
  this._writePacket(packet)
  return this
}

Handling Disconnect Response

private _handlePacket(packet, done) {
  const { cmd } = packet
  this.log('_handlePacket :: handling command: %s', cmd)
  switch (cmd) {
    case 'disconnect':
      this.stream.on('close', done)
      this.stream.removeListener('close', done)
      this.stream.destroy()
      this.emit('disconnect', packet)
      done()
      break
    // other cases omitted for brevity
  }
}

Conclusion

By extending EventEmitter and using net.createConnection, we built a compact yet complete MQTT client in a few hundred lines of code. The implementation demonstrates how streams drive asynchronous, non‑blocking data parsing and processing, and shows how events can be emitted for external consumers.

Understanding this example equips you to explore other protocol implementations (e.g., WebSocket, HTTP) and adapt the same stream‑centric design.

Features not covered in this tutorial include multi‑platform support (WebSocket for browsers), MQTT versions 3.0/5.0, QoS levels 1 and 2, reconnection logic, message caching, and encrypted communication.

IoTStreamsProtocolMQTT
Goodme Frontend Team
Written by

Goodme Frontend Team

Regularly sharing the team's insights and expertise in the frontend field

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.