Build a Simple MQTT Client with Node.js Streams: Step‑by‑Step Guide
This article explains the MQTT protocol, demonstrates why streams are ideal for protocol implementation, and provides a complete Node.js example—including connection, subscription, publishing, heartbeat, and graceful shutdown—illustrated with clear code snippets and diagrams.
Introduction
MQTT (Message Queuing Telemetry Transport) is a lightweight publish/subscribe messaging protocol widely used in IoT and other scenarios that require reliable message delivery, such as device‑to‑device communication, real‑time messaging, and traffic management.
Human sensor -> MQTT Broker -> subscriber (e.g., smart bulb) User A sends a message -> MQTT Broker -> User B receives the message Vehicle A publishes brake warning -> MQTT Broker -> Vehicles B, C receive the warningStudying MQTT helps you understand protocol design and also deepens your knowledge of other TCP‑based protocols like HTTP and WebSocket.
Streams
What Is a Stream?
A stream processes data piece by piece as it becomes available, instead of loading the entire dataset into memory. Imagine moving water between two pools: the traditional method uses buckets (load all data at once), while a stream is like a pipe that continuously transfers water.
Code example using the bucket analogy:
const fs = require('fs');
fs.readFile('poolA.txt', 'utf8', (err, data) => {
if (err) {
console.error(err);
return;
}
// "data" is the bucket, its contents are the water
// In real scenarios you would check the size and split it if needed
fs.writeFile('poolB.txt', data, (err) => {
if (err) {
console.error(err);
return;
}
});
});Using a pipe (the water pipe) simplifies the process:
var fs = require('fs');
var readable = fs.createReadStream('poolA.txt');
var writable = fs.createWriteStream('poolB.txt');
readable.pipe(writable); // pipe works like a water pipeConversion diagram:
Data --> Water
Memory --> Pool
Stream --> PipeWhen a pipe connects two pools, water flows automatically, and the pipe can also control flow rate.
var fs = require('fs');
var readable = fs.createReadStream('poolA.txt');
var writable = fs.createWriteStream('poolB.txt');
readable.pipe(writable); // pipe works like a water pipeWhy Use Streams for Protocol Implementation?
From Stream Characteristics
Memory Efficiency : Streams process large files or data in small chunks, avoiding the need to load everything into memory.
Back‑pressure Control : When the consumer is slower than the producer, data is buffered and the producer pauses.
Pipe Operations : Streams can be connected via pipe, e.g., a readable stream directly piped to a writable stream.
From Open‑Source Library Usage
Node.js http library is built on streams.
// initiate connection
if (this.agent) {
this.agent.addRequest(this, optsWithoutSignal);
} else {
// No agent, default to Connection:close.
this._last = true;
this.shouldKeepAlive = false;
let opts = optsWithoutSignal;
if (opts.path || opts.socketPath) {
opts = { ...optsWithoutSignal };
if (opts.socketPath) {
opts.path = opts.socketPath;
} else if (opts.path) {
opts.path = undefined;
}
}
if (typeof opts.createConnection === 'function') {
const oncreate = once((err, socket) => {
if (err) {
process.nextTick(() => this.emit('error', err));
} else {
this.onSocket(socket);
}
});
try {
const newSocket = opts.createConnection(opts, oncreate);
if (newSocket) {
oncreate(null, newSocket);
}
} catch (err) {
oncreate(err);
}
} else {
debug('CLIENT use net.createConnection', opts);
// use net.createConnection to create a connection data stream
this.onSocket(net.createConnection(opts));
}
}Node‑Redis: implements the Redis protocol and provides a high‑performance Node.js client.
#createNetSocket(): CreateSocketReturn<net.Socket> {
return {
connectEvent: 'connect',
// use net.connect to create a connection data stream
socket: net.connect(this.#options as net.NetConnectOpts) // TODO
};
}
#createTlsSocket(): CreateSocketReturn<tls.TLSSocket> {
return {
connectEvent: 'secureConnect',
// use tls.connect to create a connection encrypted data stream
socket: tls.connect(this.#options as tls.ConnectionOptions) // TODO
};
}Mysql2: provides a MySQL protocol implementation for Node.js.
if (!opts.config.stream) {
if (opts.config.socketPath) {
this.stream = Net.connect(opts.config.socketPath);
} else {
this.stream = Net.connect(
opts.config.port,
opts.config.host
);
// Optionally enable keep-alive on the socket.
if (this.config.enableKeepAlive) {
this.stream.on('connect', () => {
this.stream.setKeepAlive(true, this.config.keepAliveInitialDelay);
});
}
// Enable TCP_NODELAY flag so packets are sent immediately.
this.stream.setNoDelay(true);
}
} else if (typeof opts.config.stream === 'function') {
this.stream = opts.config.stream(opts);
} else {
this.stream = opts.config.stream;
}MQTT Protocol
What Is a Protocol?
A protocol is a set of rules that define how two or more devices communicate, including encoding, transmission, and error handling. Examples include HTTP, TCP, and UDP.
What Is MQTT?
MQTT (Message Queuing Telemetry Transport) is a TCP/IP‑based (or WebSocket‑based for some mini‑programs) publish/subscribe protocol. The official documentation is available in English and Chinese.
The overall client‑server interaction flow is shown below:
Design Approach
Based on the previous sections, we know the MQTT stages and the definition of streams. Before designing the protocol, we first create a simple code model to show how streams can automatically handle data processing on both client and server sides, achieving asynchronous non‑blocking parsing.
let completeParse = null
const writable = new Writable({
write(buf, _, done) {
console.log('writable stream :: parse buffer, buffer content: ', buf)
// simulate packet parsing
console.log('parser :: parsed data:', buf.toString())
// cache to queue
packets.push(buf.toString())
// done indicates one frame has been processed
completeParse = done
work()
},
})We then drive the next frame using callbacks:
const nextTickWork = () => {
if (packets.length) {
// if there is more data, schedule work before the next event loop
process.nextTick(work)
} else {
const done = completeParse
completeParse = null
console.log('nextTickWork :: done')
if (done) done()
}
}
const work = () => {
const packet = packets.shift()
console.log('work :: get next packet from queue: ', packet)
if (packet) {
// simulate processing packet and then call nextTickWork
console.log('process packet:', packet)
nextTickWork()
} else {
console.log('work :: no data in queue')
const done = completeParse
completeParse = null
if (done) done()
}
}The complete model code:
import fs from 'fs'
import path from 'path'
import { Writable } from 'stream'
const packets = []
const nextTickWork = () => {
if (packets.length) {
// schedule work before next event loop
process.nextTick(work)
} else {
const done = completeParse
completeParse = null
console.log('nextTickWork :: done')
if (done) done()
}
}
const work = () => {
const packet = packets.shift()
console.log('work :: get next packet from queue: ', packet)
if (packet) {
// simulate processing packet
console.log('process packet:', packet)
nextTickWork()
} else {
console.log('work :: no data in queue')
const done = completeParse
completeParse = null
if (done) done()
}
}
let completeParse = null
const writable = new Writable({
write(buf, _, done) {
console.log('writable stream :: parse buffer, buffer content: ', buf)
console.log('parser :: parsed data:', buf.toString())
packets.push(buf.toString())
completeParse = done
work()
},
})
// simulate reading a file and piping it into the writable stream
const readable = fs.createReadStream(path.join(__dirname, '1.txt'), {
highWaterMark: 1, // internal buffer size, default 64 * 1024, here set to 1KB
})
readable.pipe(writable)Connection Phase
Bit‑Stream Data Parsing and Generation
Data is transmitted as Buffers, so we must convert commands and payloads to Buffers when sending, and convert received Buffers back to readable text. For this tutorial we rely on an open‑source library to handle the low‑level packing and parsing.
Generating Buffer Data
const mqtt = require('mqtt-packet')
// command example – see protocol docs for details
const object = {
cmd: 'publish',
retain: false,
qos: 0,
dup: false,
length: 10,
topic: 'test',
payload: 'test',
}
const opts = { protocolVersion: 4 } // we implement version 4
console.log(mqtt.generate(object))Parsing Buffer Data
const mqtt = require('mqtt-packet')
const opts = { protocolVersion: 4 }
const parser = mqtt.parser(opts)
parser.on('packet', packet => {
console.log(packet)
})Establishing the TCP Connection
Initiating TCP Connection
private createConnectionStream = (opts) => {
// default port
opts.port = opts.port || 1883
// default host
opts.hostname = opts.hostname || opts.host || 'localhost'
const { port } = opts
const host = opts.hostname
this.log('Port %d and host %s', port, host)
// create TCP connection
return net.createConnection(port, host)
}Building the Data Stream
After the TCP connection is established we construct the data Buffer stream. Instead of manually calling mqtt.generate, we use mqttPacket.writeToStream which both packs the data and writes it to the stream.
private _writePacket(packet, cb) {
this.log('_writePacket :: packet to send: %O', packet)
this.log('_writePacket :: sending packet')
this.log('_writePacket :: emit packetsend')
this.emit('packetsend', packet)
// write packet to stream
const result = mqttPacket.writeToStream(
packet,
this.stream,
this.options,
)
this.log('_writePacket :: write result: %s', result)
}Performing the Connection
connect() {
// subscribe parser to packet events
const parser = mqttPacket.parser(this.options)
// create the TCP stream
this.stream = this.createConnectionStream(this.options)
let completeParse = null
const work = () => {
const packet = packets.shift()
this.log('work :: get next packet: ', packet)
if (packet) {
this.log('work :: processing packet')
this._handlePacket(packet)
} else {
this.log('work :: no data in queue')
}
}
const writable = new Writable()
// writable._write is called when data arrives from the server
writable._write = (buf, _, done) => {
this.log('writable stream :: parse buffer: ', buf)
parser.parse(buf)
completeParse = done
work()
}
// pipe TCP stream into writable for parsing
this.stream.pipe(writable)
const packets = []
parser.on('packet', packet => {
this.log('parser :: push packet to queue: %s', packet)
packets.push(packet)
})
// send CONNECT packet
const connectPacket = {
cmd: 'connect',
protocolId: this.options.protocolId,
protocolVersion: this.options.protocolVersion,
clean: this.options.clean,
clientId: this.options.clientId,
keepalive: this.options.keepalive,
}
this._writePacket(connectPacket)
}Packet Handling Core
private _handlePacket(packet) {
const { cmd } = packet
this.log('_handlePacket :: handling command: %s', cmd)
switch (cmd) {
case 'connack':
// emit connect event for external listeners
this.emit('connect', packet)
break
default:
this.log('_handlePacket :: unknown command: %s', cmd)
this._noop()
}
}Topic Subscription Phase
After a successful connection we can subscribe to topics. A topic is analogous to a conversation channel (e.g., "technology", "politics", "life").
Subscribe Implementation
public subscribe(topic, callback) {
callback = callback || this.noop
const defaultOpts = { qos: 0 }
const subs = [{ topic, qos: defaultOpts.qos }]
const packet = {
cmd: 'subscribe',
subscriptions: subs,
messageId: this._nextId(),
}
this._writePacket(packet)
callback(null, subs)
return this
}Handling Subscription Response
private _handlePacket(packet, done) {
const { cmd } = packet
this.log('_handlePacket :: handling command: %s', cmd)
switch (cmd) {
case 'connack':
this.emit('connect', packet)
done()
break
case 'suback':
if (!packet.granted) {
this.log('suback :: subscription failed')
this.emit('error', packet)
} else {
this.log('suback :: subscription succeeded')
}
done()
break
default:
const msg = `_handlePacket :: unknown command: ${cmd}`
this.log(msg)
this.emit('error', msg)
done()
}
}Message Publishing Phase
Publish Implementation
public publish(topic, message, callback) {
this.log('publish :: send message "%s" to topic "%s"', message, topic)
const packet = {
cmd: 'publish',
topic,
payload: message,
qos: defaultQOS,
dup: false,
retain: false,
}
this._writePacket(packet)
return this
}Handling Publish Response
private _handlePacket(packet, done) {
const { cmd } = packet
this.log('_handlePacket :: handling command: %s', cmd)
switch (cmd) {
case 'publish':
const topic = packet.topic.toString()
const message = packet.payload
this.emit('message', topic, message as Buffer, packet)
done()
break
// other cases omitted for brevity
default:
const msg = `_handlePacket :: unknown command: ${cmd}`
this.log(msg)
this.emit('error', msg)
done()
}
}Heartbeat (Ping) Phase
The client and server keep the connection alive by periodically sending a ping.
Ping Check
private _checkPing() {
this.log('_checkPing :: checking heartbeat...')
if (this.pingResp) {
this.log('_checkPing :: ping response received, sending pingreq')
this.pingResp = false
this._writePacket({ cmd: 'pingreq' })
} else {
this.emit('error', new Error('Keepalive timeout'))
this.pingTimer = null
}
}
private _cleanPingTimer = () => {
clearTimeout(this.pingTimer)
this.pingTimer = null
this._reschedule()
}
private _reschedule() {
clearTimeout(this.pingTimer)
this.pingTimer = setTimeout(() => {
this._checkPing()
if (this.pingTimer) {
this._cleanPingTimer()
}
}, this.options.keepalive * 1000)
}
private _setupPingTimer() {
this.log('_setupPingTimer :: keepalive %d seconds', this.options.keepalive)
if (!this.pingTimer && this.options.keepalive) {
this.pingResp = true
this._reschedule()
}
}Handling Ping Response
private _handlePacket(packet, done) {
const { cmd } = packet
this.log('_handlePacket :: handling command: %s', cmd)
switch (cmd) {
case 'pingresp':
this.pingResp = true
this.log('pingresp :: received PINGRESP')
done()
break
// other cases omitted for brevity
}
}Disconnect Phase
Initiating Disconnect
end() {
this.log('end :: closing connection')
// stop heartbeat timer
if (this.pingTimer) {
clearTimeout(this.pingTimer)
this.pingTimer = null
}
// send DISCONNECT packet
const packet = { cmd: 'disconnect' }
this._writePacket(packet)
return this
}Handling Disconnect Response
private _handlePacket(packet, done) {
const { cmd } = packet
this.log('_handlePacket :: handling command: %s', cmd)
switch (cmd) {
case 'disconnect':
this.stream.on('close', done)
this.stream.removeListener('close', done)
this.stream.destroy()
this.emit('disconnect', packet)
done()
break
// other cases omitted for brevity
}
}Conclusion
By extending EventEmitter and using net.createConnection, we built a compact yet complete MQTT client in a few hundred lines of code. The implementation demonstrates how streams drive asynchronous, non‑blocking data parsing and processing, and shows how events can be emitted for external consumers.
Understanding this example equips you to explore other protocol implementations (e.g., WebSocket, HTTP) and adapt the same stream‑centric design.
Features not covered in this tutorial include multi‑platform support (WebSocket for browsers), MQTT versions 3.0/5.0, QoS levels 1 and 2, reconnection logic, message caching, and encrypted communication.
Goodme Frontend Team
Regularly sharing the team's insights and expertise in the frontend field
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
