Backend Development 13 min read

Efficient Large File Processing in Node.js: Stream, Buffer, and Multithreading Techniques

This article explains how to handle massive HDFS‑derived text and CSV files in Node.js by analyzing memory and CPU bottlenecks, comparing client‑side upload strategies, and presenting synchronous, callback, promise, streaming, and multithreaded worker‑thread solutions with complete code examples.

Rare Earth Juejin Tech Community
Rare Earth Juejin Tech Community
Rare Earth Juejin Tech Community
Efficient Large File Processing in Node.js: Stream, Buffer, and Multithreading Techniques

Background: The business scenario involves extracting generated data files (txt, csv) from HDFS, compressing and uploading to object storage, with file sizes ranging from kilobytes to dozens of gigabytes and billions of rows, causing frequent memory and CPU alerts.

Problem analysis: loading whole files into memory creates heap memory exhaustion, CPU overload (single‑thread usage >440%), and I/O storms due to repeated read/write.

Client‑side upload approaches: (1) JSON + base64 – simple but inflates payload and consumes browser resources; (2) FormData – streams binary directly but lacks nested object support and requires more code; (3) Token‑based request then upload to object storage – reduces front‑end logic but adds extra round‑trips.

Node.js file I/O methods:

const fs = require('node:fs');
// 同步读取
const data = fs.readFileSync('/path/to/small/file.txt', 'utf8');
console.log(data);
// 同步写入
fs.writeFileSync('/path/to/output.txt', 'Hello World', 'utf8');
const fs = require('node:fs');
// 异步读取
fs.readFile('/path/to/small/file.txt', 'utf8', (err, data) => {
  if (err) throw err;
  console.log(data);
  // 异步写入
  fs.writeFile('/path/to/output.txt', data, 'utf8', (err) => {
    if (err) throw err;
  });
});
const fs = require('node:fs/promises');
async function readWriteFile() {
  try {
    // Promise方式读取
    const data = await fs.readFile('/path/to/small/file.txt', 'utf8');
    console.log(data);
    // Promise方式写入
    await fs.writeFile('/path/to/output.txt', data, 'utf8');
  } catch (err) {
    console.error(err);
  }
}
readWriteFile();
const fs = require('node:fs');
const readline = require('node:readline');
// 流式读取
const readStream = fs.createReadStream('/path/to/small/file.txt', {
  encoding: 'utf8',
  highWaterMark: 1024 // 每次读取的字节数
});
// 流式写入
const writeStream = fs.createWriteStream('/path/to/output.txt');
// 逐行处理
const rl = readline.createInterface({
  input: readStream,
  crlfDelay: Infinity
});
rl.on('line', (line) => {
  writeStream.write(`${line}\n`);
});
rl.on('close', () => {
  writeStream.end();
});
const fs = require('node:fs');
function copyLargeFile(src, dest) {
  return new Promise((resolve, reject) => {
    // 1. 创建可读流(128MB分块)
    const readStream = fs.createReadStream(src, {
      highWaterMark: 128 * 1024 * 1024
    });
    // 2. 创建可写流(64MB缓冲)
    const writeStream = fs.createWriteStream(dest, {
      highWaterMark: 64 * 1024 * 1024
    });
    // 3. 管道连接与错误处理
    readStream
      .on('error', reject)
      .pipe(writeStream)
      .on('error', reject)
      .on('finish', () => {
        console.log(`文件 ${src} 已成功拷贝至 ${dest}`);
        resolve();
      });
  });
}
// 使用示例
copyLargeFile('xxx.txt', 'copy_xxx.txt')
  .catch(err => console.error('处理失败:', err));
const fs = require('node:fs');
const { Buffer } = require('node:buffer');
// 内存映射读取
fs.open('/path/to/small/file.txt', 'r', (err, fd) => {
  if (err) throw err;
  const stats = fs.fstatSync(fd);
  const buffer = Buffer.alloc(stats.size);
  fs.read(fd, buffer, 0, buffer.length, 0, (err) => {
    if (err) throw err;
    console.log(buffer.toString('utf8'));
    fs.close(fd);
  });
});

Current implementation uses a pipeline of fs.createReadStream → split → through2 → fs.createWriteStream, which may suffer performance issues on very large files.

Suggested improvements: replace line‑by‑line string processing with pure Buffer handling, avoid split and readline, and enable multithreading via worker_threads to process file chunks in parallel.

Final solution (simplified): a main thread discovers delimiter positions, spawns a worker per CPU core, each worker reads a byte range, processes lines as Buffers, and the main thread merges ordered results into the output file.

import * as fs from 'fs';
import * as os from 'os';
import { Worker, isMainThread, parentPort, workerData } from 'worker_threads';

// 配置常量
const FILE_DELIMITER = 'YOUR_FILE_DELIMITER';
const EOL = os.EOL;
const EOL_BUFFER = Buffer.from(EOL);
const DELIMITER_LINE = Buffer.from(FILE_DELIMITER + EOL);
const WORKER_COUNT = os.cpus().length;

// 主线程预处理函数
async function findDelimiterPositions(filePath: string) {
  const fileSize = (await fs.promises.stat(filePath)).size;
  const chunkSize = 1024 * 1024;
  const positions: number[] = [];
  let previousRemaining = Buffer.alloc(0);

  const fd = await fs.promises.open(filePath, 'r');
  try {
    for (let offset = 0; offset < fileSize;) {
      const buffer = Buffer.alloc(chunkSize + DELIMITER_LINE.length);
      const { bytesRead } = await fd.read(buffer, 0, chunkSize, offset);
      if (!bytesRead) break;

      const combined = Buffer.concat([previousRemaining, buffer.subarray(0, bytesRead)]);
      let pos = 0;

      while (pos < combined.length) {
        const idx = combined.indexOf(DELIMITER_LINE, pos);
        if (idx === -1) {
          previousRemaining = combined.subarray(pos);
          break;
        }
        positions.push(offset - previousRemaining.length + idx);
        pos = idx + DELIMITER_LINE.length;
      }

      offset += bytesRead;
    }
  } finally {
    await fd.close();
  }
  return positions;
}

// 工作线程处理逻辑
function workerProcess() {
  const { filePath, start, end, filePo } = workerData;
  const stream = fs.createReadStream(filePath, { start, end });
  const result: Buffer[] = [];
  let stationBuffer = Buffer.alloc(100);
  let tempBuffer = Buffer.alloc(5);
  let linePo = 0;

  // 字节处理状态机
  let state = 0; // 0: 正常行处理,1: 分隔符处理
  let buffer = Buffer.alloc(0);

  return new Promise
((resolve) => {
    stream.on('data', (chunk: Buffer) => {
      buffer = Buffer.concat([buffer, chunk]);

      while (true) {
        const eolIndex = buffer.indexOf(EOL_BUFFER);
        if (eolIndex === -1) break;

        const line = buffer.subarray(0, eolIndex);
        buffer = buffer.subarray(eolIndex + EOL_BUFFER.length);

        if (line.equals(DELIMITER_LINE.subarray(0, line.length))) {
          // 分隔符行直接跳过
          continue;
        }

        // 字节到字符串转换优化
        const transformed = processLine(line, linePo, filePo);
        result.push(Buffer.from(transformed + EOL));
        linePo++;
      }
    });

    stream.on('end', () => {
      parentPort.postMessage({
        filePo,
        data: Buffer.concat(result)
      });
      resolve();
    });
  });
}

// 优化的行处理函数
function processLine(lineBuffer: Buffer, linePo: number, filePo: number) {
  // 实现你的自定义转换逻辑
  return lineBuffer.toString() + `_processed_${filePo}_${linePo}`;
}

// 主线程逻辑
async function main() {
  const filePath = process.argv[2];
  const outputPath = process.argv[3];

  const delimiterPositions = await findDelimiterPositions(filePath);
  const fileSize = (await fs.promises.stat(filePath)).size;
  const parts = [];
  let prevPos = 0;

  // 生成处理区间
  for (const pos of delimiterPositions) {
    parts.push({ start: prevPos, end: pos, filePo: parts.length });
    prevPos = pos + DELIMITER_LINE.length;
  }
  parts.push({ start: prevPos, end: fileSize, filePo: parts.length });

  // 创建工作线程
  const workers = new Map();
  const results = new Map();

  parts.forEach((part, idx) => {
    const worker = new Worker(__filename, {
      workerData: { ...part, filePath }
    });

    worker.on('message', (msg) => {
      results.set(msg.filePo, msg.data);
      if (results.size === parts.length) {
        // 按顺序写入结果
        const sorted = Array.from(results.keys()).sort((a, b) => a - b);
        const ws = fs.createWriteStream(outputPath);
        sorted.forEach(po => ws.write(results.get(po)));
        ws.end();
      }
    });

    workers.set(idx, worker);
  });
}

// 启动入口
if (isMainThread) {
  main();
} else {
  workerProcess();
}

Conclusion: using Buffer‑level byte processing, stream pipelines, and worker‑thread parallelism dramatically reduces memory consumption, CPU load, and I/O overhead for massive file processing tasks.

performanceNode.jsMultithreadingFile I/Ostreams
Rare Earth Juejin Tech Community
Written by

Rare Earth Juejin Tech Community

Juejin, a tech community that helps developers grow.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.