Streams — Readable, Writable, Transform, and Piping

Streams are one of Node.js’s most powerful and most misunderstood abstractions. Instead of reading an entire file into memory before processing it, streams process data chunk by chunk as it arrives — enabling you to handle files larger than available RAM, pipe data between sources and destinations with minimal memory overhead, and build composable data transformation pipelines. Every major Node.js API is built on streams: HTTP requests and responses, file system reads and writes, TCP sockets, zlib compression, and crypto. Mastering streams is the difference between writing Node.js code that works and Node.js code that scales.

Stream Types

Type Direction Key Events/Methods Examples
Readable Data source — read from data, end, error, pipe(), read() fs.createReadStream(), HTTP request body, process.stdin
Writable Data sink — write to write(), end(), drain, finish fs.createWriteStream(), HTTP response, process.stdout
Duplex Both readable and writable simultaneously Both sets TCP socket, WebSocket
Transform Duplex that transforms data as it passes through _transform(chunk, enc, cb) zlib, crypto, CSV parser, JSON transformer
PassThrough Transform that passes data unchanged Inherits Transform Testing, instrumentation, forking streams
Note: Streams operate in two modes: flowing mode (data is delivered as soon as it arrives — attach a data listener or call pipe()) and paused mode (data is buffered until you explicitly call read()). Node.js v10+ introduced the async iteration API (for await...of stream) which is now the preferred modern pattern — it handles back-pressure automatically and integrates cleanly with async/await without managing event listeners manually.
Tip: Use stream.pipeline() instead of manually chaining .pipe() calls. pipeline(source, transform1, transform2, destination, callback) correctly handles error propagation and cleanup across all stages — if any stream in the chain errors, all others are destroyed. The older pipe() method does not propagate errors: if the destination errors, the source keeps streaming into a dead pipe. Use stream.pipeline() for all production stream compositions.
Warning: Back-pressure is the most commonly ignored stream concept. When a Readable produces data faster than a Writable can consume it, the internal buffer grows without bound — leading to memory exhaustion. The write() method returns false when the buffer is full; you must pause the source and wait for the drain event before resuming. pipe() and pipeline() handle this automatically — which is the primary reason to use them rather than manually calling write() in a data handler.

Complete Stream Examples

const fs       = require('fs');
const zlib     = require('zlib');
const crypto   = require('crypto');
const { pipeline, Transform, PassThrough } = require('stream');
const { promisify } = require('util');

const pipelineAsync = promisify(pipeline);

// ── Reading a file with streams ───────────────────────────────────────────
// Modern: async iteration
async function readFileChunks(filePath) {
    const stream = fs.createReadStream(filePath, { encoding: 'utf8', highWaterMark: 64 * 1024 });
    let totalBytes = 0;
    for await (const chunk of stream) {
        totalBytes += chunk.length;
        // process chunk without holding entire file in memory
    }
    return totalBytes;
}

// ── pipeline() — composable, error-safe stream chaining ───────────────────
// Compress and encrypt a large file
async function compressAndEncrypt(inputPath, outputPath, password) {
    const key = crypto.scryptSync(password, 'salt', 32);
    const iv  = crypto.randomBytes(16);

    await pipelineAsync(
        fs.createReadStream(inputPath),                          // source
        zlib.createGzip(),                                       // compress
        crypto.createCipheriv('aes-256-cbc', key, iv),          // encrypt
        fs.createWriteStream(outputPath),                        // destination
    );
    console.log('Done — file compressed and encrypted');
}

// ── Custom Transform stream ───────────────────────────────────────────────
// Transform that converts CSV rows to JSON objects
class CSVToJSON extends Transform {
    constructor(options = {}) {
        super({ ...options, objectMode: true });
        this._headers  = null;
        this._buffer   = '';
    }

    _transform(chunk, encoding, callback) {
        this._buffer += chunk.toString();
        const lines   = this._buffer.split('\n');
        this._buffer  = lines.pop();  // keep incomplete last line for next chunk

        for (const line of lines) {
            const trimmed = line.trim();
            if (!trimmed) continue;
            const values  = trimmed.split(',').map(v => v.trim());

            if (!this._headers) {
                this._headers = values;   // first line is headers
            } else {
                const obj = {};
                this._headers.forEach((h, i) => { obj[h] = values[i] ?? ''; });
                this.push(obj);           // emit one object per row
            }
        }
        callback();
    }

    _flush(callback) {
        if (this._buffer.trim() && this._headers) {
            const values = this._buffer.split(',').map(v => v.trim());
            const obj    = {};
            this._headers.forEach((h, i) => { obj[h] = values[i] ?? ''; });
            this.push(obj);
        }
        callback();
    }
}

// ── Streaming HTTP response to file ───────────────────────────────────────
const https = require('https');

async function downloadFile(url, dest) {
    await new Promise((resolve, reject) => {
        const file = fs.createWriteStream(dest);
        https.get(url, response => {
            pipeline(response, file, err => err ? reject(err) : resolve());
        }).on('error', reject);
    });
}

// ── Back-pressure handling (manual) ──────────────────────────────────────
function copyWithBackpressure(readable, writable) {
    readable.on('data', chunk => {
        const ok = writable.write(chunk);
        if (!ok) {
            readable.pause();                 // buffer full — pause source
            writable.once('drain', () => {
                readable.resume();            // buffer drained — resume source
            });
        }
    });
    readable.on('end', () => writable.end());
}

// ── Readable stream from array (for testing) ──────────────────────────────
const { Readable } = require('stream');

async function* generateTasks(count) {
    for (let i = 0; i < count; i++) {
        yield JSON.stringify({ id: i, title: `Task ${i}`, priority: 'medium' }) + '\n';
    }
}

// Use a generator as a Readable stream
const taskStream = Readable.from(generateTasks(10000));
await pipelineAsync(
    taskStream,
    fs.createWriteStream('/tmp/tasks.ndjson'),
);

How It Works

Step 1 — Streams Process Data in Chunks Using Internal Buffers

A highWaterMark (default 16KB for byte streams, 16 objects for object-mode streams) controls the maximum amount of data buffered internally. When the buffer fills, the stream signals back-pressure. This small, bounded buffer is what gives streams their O(1) memory characteristic — regardless of file size, memory usage stays proportional to highWaterMark, not file size.

Step 2 — pipeline() Propagates Errors and Destroys All Streams

When any stream in a pipeline() chain emits an error, all other streams in the chain are automatically destroyed and their resources freed. Without this, a failed destination stream leaves the source still open and the intermediate streams buffering data with no place to go. The callback receives the error, and no partial output is written. This is correct error handling that .pipe() does not provide.

Step 3 — Transform Streams Implement _transform and _flush

_transform(chunk, encoding, callback) is called for each incoming chunk. Call this.push(transformedData) to emit output (zero, one, or multiple pushes per chunk). Call callback() when done processing the chunk. _flush(callback) is called once after all input has been processed — use it to emit any buffered state (like the incomplete CSV line in the example).

Step 4 — Object Mode Removes the Bytes Constraint

By default, streams work with Buffer or string chunks. With objectMode: true, streams can emit and receive any JavaScript value — objects, arrays, numbers. The CSV-to-JSON Transform emits one plain JavaScript object per CSV row, allowing downstream consumers to work with typed data rather than raw bytes.

Step 5 — async Iteration Simplifies Stream Consumption

for await (const chunk of stream) is the cleanest way to consume a Readable stream. It automatically handles back-pressure (pauses the stream when the loop body is not ready for more data), errors (throws as the loop’s caught exception), and the end condition (the loop terminates). It eliminates the need to manually manage data, end, and error events.

Common Mistakes

Mistake 1 — Using .pipe() which silently ignores destination errors

❌ Wrong — source keeps streaming after destination errors:

fs.createReadStream('big.csv').pipe(fs.createWriteStream('/full-disk/out.csv'));
// If write fails: error event on writeStream, but readStream keeps running

✅ Correct — use pipeline() for error propagation:

await pipelineAsync(
    fs.createReadStream('big.csv'),
    fs.createWriteStream('/full-disk/out.csv')
);  // error thrown — both streams cleaned up

Mistake 2 — Loading the entire stream into memory to process it

❌ Wrong — loads 2GB file entirely into memory:

const data = fs.readFileSync('huge-export.csv');  // 2GB in RAM
const lines = data.toString().split('\n');

✅ Correct — process line by line with streams:

await pipelineAsync(
    fs.createReadStream('huge-export.csv'),
    new CSVToJSON(),
    new LineProcessor(),  // process each row
);  // constant memory regardless of file size

Quick Reference

Task Code
Read file as stream fs.createReadStream(path, { highWaterMark: 64*1024 })
Write file as stream fs.createWriteStream(path)
Compose streams safely await pipelineAsync(src, transform, dest)
Custom Transform Extend Transform, implement _transform and _flush
Consume stream for await (const chunk of stream) { ... }
Stream from generator Readable.from(asyncGenerator())
Gzip stream zlib.createGzip() in pipeline

🧠 Test Yourself

A Readable stream produces data faster than the Writable can consume it. The Writable’s write() returns false. What should happen next, and which approach handles this automatically?