Skip to main content

Advanced Readable Streams: Custom Streams and Manual Reading

You've learned how to use readable streams with the 'data' event for automatic reading. But what if you need more control? What if you want to create your own streams or read data manually? Let's discover the advanced techniques that give you complete mastery over stream behavior.

Quick Reference

When to use these techniques:

  • Creating custom data sources as streams
  • Need precise control over when to read data
  • Building stream processing libraries
  • Implementing custom backpressure strategies

Key concepts:

// Manual reading with 'readable' event
stream.on("readable", () => {
let chunk;
while ((chunk = stream.read()) !== null) {
process(chunk);
}
});

// Creating custom streams
class MyStream extends Readable {
_read() {
this.push(data);
}
}

What You Need to Know First

Required reading (in order):

  1. Readable Streams: Processing Data Without Memory Overload - You must understand basic stream concepts first

Technical prerequisites:

  • Strong understanding of Node.js events
  • Comfortable with async/await and Promises
  • Familiarity with class-based programming in JavaScript/TypeScript

Tools you'll need:

  • Node.js v18 or higher
  • TypeScript (optional but recommended)

What We'll Cover in This Article

By the end of this guide, you'll master:

  • The 'readable' event and manual reading
  • When to use manual vs automatic reading
  • Creating custom readable streams from scratch
  • Using async iterators with streams
  • Piping streams together efficiently
  • Advanced flow control techniques
  • Performance tuning for streams

What We'll Explain Along the Way

Advanced concepts explained with examples:

  • The _read() internal method
  • Push mode vs pull mode streams
  • Object mode streams
  • Stream composition patterns
  • Error propagation in stream pipelines

The 'readable' Event: Manual Reading Control

So far, we've used the 'data' event where the stream automatically pushes chunks to you. But sometimes you need to pull data manually when you're ready. That's where the 'readable' event comes in.

Understanding Pull vs Push

Let's visualize the difference:

PUSH MODE ('data' event):
Stream → [Automatic] → Your Code
"Here's data!" "Here's more!" "And more!"

PULL MODE ('readable' event):
Stream ← [Your Request] ← Your Code
"Give me data when I ask for it"

Basic Manual Reading

import { createReadStream } from "fs";

const stream = createReadStream("data.txt", {
encoding: "utf-8",
highWaterMark: 64 * 1024,
});

// Manual reading approach
stream.on("readable", () => {
console.log("📢 Data is available to read");

let chunk;

// Read all available data
while ((chunk = stream.read()) !== null) {
console.log(`Read ${chunk.length} bytes`);
console.log(`Buffer still has: ${stream.readableLength} bytes`);

// Process this chunk
processChunk(chunk);
}

console.log("Buffer empty, waiting for more data...");
});

stream.on("end", () => {
console.log("✅ All data has been read");
});

function processChunk(chunk: string): void {
// Your processing logic
console.log(`Processing: ${chunk.substring(0, 50)}...`);
}

What's happening:

  1. 'readable' event fires when data is available
  2. You call stream.read() to get data
  3. Returns null when buffer is empty
  4. Event fires again when more data arrives

Reading Specific Amounts

Here's where manual reading really shines - you can read exact byte amounts:

import { createReadStream } from "fs";

/**
* Read binary file format with fixed-size headers
* Example: PNG file format
*/
function parseBinaryFile(filePath: string): void {
const stream = createReadStream(filePath);

stream.on("readable", () => {
// Read PNG signature (8 bytes)
const signature = stream.read(8);
if (signature) {
console.log("PNG signature:", signature);
}

// Read chunk length (4 bytes)
const length = stream.read(4);
if (length) {
const chunkSize = length.readUInt32BE(0);
console.log(`Next chunk size: ${chunkSize} bytes`);

// Read the chunk data (exact size)
const chunkData = stream.read(chunkSize);
if (chunkData) {
console.log(`Read chunk: ${chunkData.length} bytes`);
}
}
});

stream.on("end", () => {
console.log("File parsing complete");
});
}

When to use manual reading:

  • Parsing binary file formats (images, audio, video)
  • Implementing custom protocols
  • Need to read exact byte amounts
  • Implementing custom buffering strategies

Comparison: 'data' vs 'readable'

Let's see both approaches side by side:

import { createReadStream } from "fs";

// Approach 1: Automatic with 'data'
console.log("=== AUTOMATIC READING ===");
const auto = createReadStream("data.txt");

auto.on("data", (chunk) => {
// Stream pushes data to you automatically
console.log(`Auto received: ${chunk.length} bytes`);
// Process immediately
});

// Approach 2: Manual with 'readable'
console.log("\n=== MANUAL READING ===");
const manual = createReadStream("data.txt");

manual.on("readable", () => {
// You pull data when ready
console.log("Data available, pulling now...");

let chunk;
while ((chunk = manual.read()) !== null) {
console.log(`Manually read: ${chunk.length} bytes`);
// You control when to read next chunk
}
});

Decision guide:

Use 'data' (automatic) when:Use 'readable' (manual) when:
Simple sequential processingNeed exact byte amounts
Processing can keep upComplex parsing logic
Want simplest codeImplementing protocols
Standard file readingCustom buffering needed

Creating Custom Readable Streams

Now let's discover how to create your own readable streams. This is powerful for turning any data source into a stream.

Your First Custom Stream

import { Readable } from "stream";

/**
* Custom stream that generates numbers
* Great for understanding stream internals
*/
class NumberGeneratorStream extends Readable {
private current: number;
private max: number;

constructor(max: number) {
// Call parent constructor
super({
encoding: "utf-8",
highWaterMark: 16 * 1024,
});

this.current = 1;
this.max = max;
}

/**
* This method is called automatically when stream needs data
* You MUST implement this method
*/
_read(size: number): void {
console.log(`_read() called, requesting ${size} bytes`);

// Generate next number
if (this.current <= this.max) {
const number = this.current;
this.current++;

// Push data into the stream
const data = `Number: ${number}\n`;
const canContinue = this.push(data);

console.log(`Pushed: "${data.trim()}"`);
console.log(`Can continue pushing: ${canContinue}`);

// If canContinue is false, backpressure is applied
// Stream will call _read() again when buffer has space
} else {
// Signal end of stream by pushing null
this.push(null);
console.log("Stream ended (pushed null)");
}
}
}

// Usage
const stream = new NumberGeneratorStream(5);

stream.on("data", (chunk) => {
console.log(`Consumer received: ${chunk.toString().trim()}`);
});

stream.on("end", () => {
console.log("✅ All numbers generated");
});

What's happening inside:

  1. Consumer reads data (attaches 'data' listener)
  2. Stream buffer is empty, so _read() is called
  3. You call this.push(data) to add data to buffer
  4. Buffer fills up, push() returns false (backpressure)
  5. Consumer processes data, buffer empties
  6. _read() called again automatically
  7. When done, push null to signal end

Understanding this.push()

The push() method is the heart of custom streams:

import { Readable } from "stream";

class DemoStream extends Readable {
_read(): void {
// Push returns true if buffer has space
const canContinue1 = this.push("First chunk\n");
console.log(`After push 1: ${canContinue1}`); // true

const canContinue2 = this.push("Second chunk\n");
console.log(`After push 2: ${canContinue2}`); // true

// Keep pushing until buffer is full
const canContinue3 = this.push("Third chunk\n");
console.log(`After push 3: ${canContinue3}`); // might be false

if (!canContinue3) {
console.log("⏸️ Backpressure! Stop pushing, wait for _read() call");
// Don't push more - wait for next _read() call
return;
}

// Signal end
this.push(null);
}
}

Key rules for this.push():

  • Returns true if buffer has space (keep pushing)
  • Returns false if buffer is full (stop and wait)
  • Push null to signal end of stream
  • Never push after pushing null

Real-World Example: API Data Stream

Let's create a stream that fetches paginated API data:

import { Readable } from "stream";

/**
* Stream that fetches paginated data from an API
* Automatically handles pagination
*/
class ApiDataStream extends Readable {
private currentPage: number;
private readonly maxPages: number;
private readonly apiUrl: string;
private isFetching: boolean;

constructor(apiUrl: string, maxPages: number = 10) {
super({
objectMode: true, // Stream objects instead of strings/buffers
highWaterMark: 5, // Buffer up to 5 objects
});

this.apiUrl = apiUrl;
this.currentPage = 1;
this.maxPages = maxPages;
this.isFetching = false;
}

async _read(): Promise<void> {
// Prevent concurrent fetches
if (this.isFetching) {
return;
}

// Check if we're done
if (this.currentPage > this.maxPages) {
this.push(null); // End stream
return;
}

this.isFetching = true;

try {
console.log(`Fetching page ${this.currentPage}...`);

// Fetch data from API
const response = await fetch(`${this.apiUrl}?page=${this.currentPage}`);

if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}

const data = await response.json();

// Push each item individually
for (const item of data.items) {
const canContinue = this.push(item);

if (!canContinue) {
// Backpressure - stop and wait
console.log("⏸️ Backpressure applied, pausing fetch");
break;
}
}

this.currentPage++;
console.log(`✅ Page ${this.currentPage - 1} complete`);
} catch (error) {
console.error("Error fetching data:", error);
this.destroy(error as Error);
} finally {
this.isFetching = false;
}
}
}

// Usage
const stream = new ApiDataStream("https://api.example.com/data", 3);

stream.on("data", (item) => {
console.log("Received item:", item);
// Process each item
});

stream.on("end", () => {
console.log("All pages fetched");
});

stream.on("error", (error) => {
console.error("Stream error:", error);
});

What makes this powerful:

  • Automatically handles pagination
  • Respects backpressure (won't fetch faster than you can process)
  • Error handling built-in
  • Memory efficient (streams data instead of loading all pages)

Object Mode Streams

Notice the objectMode: true option? This lets streams transport JavaScript objects instead of just strings/buffers:

import { Readable } from "stream";

interface User {
id: number;
name: string;
email: string;
}

/**
* Stream that generates user objects
*/
class UserStream extends Readable {
private userId: number;
private readonly maxUsers: number;

constructor(maxUsers: number) {
super({
objectMode: true, // Enable object mode
highWaterMark: 100, // Buffer 100 objects
});

this.userId = 1;
this.maxUsers = maxUsers;
}

_read(): void {
if (this.userId > this.maxUsers) {
this.push(null); // Done
return;
}

// Push JavaScript objects directly
const user: User = {
id: this.userId,
name: `User ${this.userId}`,
email: `user${this.userId}@example.com`,
};

this.push(user);
this.userId++;
}
}

// Usage
const stream = new UserStream(5);

stream.on("data", (user: User) => {
// Receive typed objects
console.log(`User ${user.id}: ${user.name} (${user.email})`);
});

When to use object mode:

  • Processing database records
  • Transforming API responses
  • Building data processing pipelines
  • Any time you're working with structured data

Simple Stream Creation with Readable.from()

For simpler cases, you don't need a full class:

import { Readable } from "stream";

// Create stream from array
const arrayStream = Readable.from(["Hello", "World", "!"], {
encoding: "utf-8",
});

arrayStream.on("data", (chunk) => {
console.log(`Chunk: ${chunk}`);
});

// Create stream from async generator
async function* generateData() {
for (let i = 1; i <= 5; i++) {
// Simulate async data fetching
await new Promise((resolve) => setTimeout(resolve, 100));
yield `Data ${i}`;
}
}

const asyncStream = Readable.from(generateData());

asyncStream.on("data", (chunk) => {
console.log(`Async chunk: ${chunk}`);
});

// Create stream from async iterator
async function* fetchPages() {
let page = 1;
while (page <= 3) {
const response = await fetch(`https://api.example.com/data?page=${page}`);
const data = await response.json();
yield data;
page++;
}
}

const apiStream = Readable.from(fetchPages(), {
objectMode: true,
});

for await (const page of apiStream) {
console.log("Page:", page);
}

This is the modern, clean way to create simple streams!

Using Streams with Async Iterators

Modern JavaScript provides elegant ways to work with streams:

import { createReadStream } from "fs";
import { createInterface } from "readline";

/**
* Process file line by line using async iteration
* This is the cleanest approach for line-based processing!
*/
async function processFileLines(filePath: string): Promise<void> {
const stream = createReadStream(filePath);
const rl = createInterface({ input: stream });

let lineNumber = 0;

// Clean async/await loop - no event listeners needed!
for await (const line of rl) {
lineNumber++;

console.log(`Line ${lineNumber}: ${line.substring(0, 50)}...`);

// Can use await inside loop
await processLine(line);

// Automatic backpressure handling!
}

console.log(`✅ Processed ${lineNumber} lines`);
}

async function processLine(line: string): Promise<void> {
// Simulate async processing
return new Promise((resolve) => setTimeout(resolve, 10));
}

// Usage
processFileLines("large-file.txt")
.then(() => console.log("Done"))
.catch((error) => console.error("Error:", error));

Why async iterators are amazing:

  • No manual event listener management
  • Automatic backpressure (loop waits for await)
  • Clean, readable code
  • Built-in error handling with try/catch

Converting Streams to Async Iterables

Any readable stream can be used as an async iterable:

import { createReadStream } from "fs";

async function processStream() {
const stream = createReadStream("data.txt", {
encoding: "utf-8",
highWaterMark: 64 * 1024,
});

try {
// Iterate over chunks
for await (const chunk of stream) {
console.log(`Processing ${chunk.length} bytes`);

// Automatically paused while this code runs
await heavyProcessing(chunk);

// Automatically resumed after await completes
}

console.log("✅ Stream processing complete");
} catch (error) {
console.error("Error processing stream:", error);
}
}

async function heavyProcessing(data: string): Promise<void> {
// Simulate time-consuming work
return new Promise((resolve) => setTimeout(resolve, 100));
}

Piping Streams: Connecting the Pieces

One of the most powerful features of streams is piping - connecting a readable stream directly to a writable stream:

Basic Piping

import { createReadStream, createWriteStream } from "fs";

// Simple file copy using pipes
const readStream = createReadStream("input.txt");
const writeStream = createWriteStream("output.txt");

// Connect them
readStream.pipe(writeStream);

writeStream.on("finish", () => {
console.log("✅ File copied successfully");
});

writeStream.on("error", (error) => {
console.error("Error writing:", error);
});

What piping does automatically:

  • Handles backpressure (pauses reading when writing is slow)
  • Manages errors
  • Cleans up resources
  • Much cleaner than manual event handling

Multi-Stage Pipelines

You can chain multiple streams together:

import { createReadStream, createWriteStream } from "fs";
import { createGzip } from "zlib";
import { pipeline } from "stream/promises";

/**
* Compress a file using stream pipeline
* Read → Compress → Write
*/
async function compressFile(input: string, output: string): Promise<void> {
try {
await pipeline(
createReadStream(input),
createGzip(), // Transform stream (compresses)
createWriteStream(output)
);

console.log("✅ File compressed successfully");
} catch (error) {
console.error("❌ Compression failed:", error);
throw error;
}
}

// Usage
compressFile("large-file.txt", "large-file.txt.gz")
.then(() => console.log("Done"))
.catch((error) => console.error(error));

Why pipeline() is better than .pipe():

  • Returns a Promise (works with async/await)
  • Better error handling
  • Automatic cleanup on errors
  • Recommended for modern code

Real-World Pipeline Example

import { createReadStream, createWriteStream } from "fs";
import { createGzip } from "zlib";
import { Transform } from "stream";
import { pipeline } from "stream/promises";

/**
* Transform stream that converts text to uppercase
*/
class UpperCaseTransform extends Transform {
_transform(
chunk: Buffer,
encoding: string,
callback: (error?: Error, data?: any) => void
): void {
try {
// Convert to uppercase
const upperChunk = chunk.toString().toUpperCase();

// Push transformed data
this.push(upperChunk);

// Signal completion
callback();
} catch (error) {
callback(error as Error);
}
}
}

/**
* Complex pipeline: Read → Uppercase → Compress → Write
*/
async function processFile(): Promise<void> {
await pipeline(
createReadStream("input.txt"),
new UpperCaseTransform(),
createGzip(),
createWriteStream("output.txt.gz")
);
}

Performance Tuning

Choosing the Right highWaterMark

The buffer size dramatically affects performance:

import { createReadStream } from "fs";
import { performance } from "perf_hooks";

async function testBufferSize(
filePath: string,
bufferSize: number
): Promise<void> {
return new Promise((resolve) => {
const startTime = performance.now();
let bytesRead = 0;
let chunksReceived = 0;

const stream = createReadStream(filePath, {
highWaterMark: bufferSize,
});

stream.on("data", (chunk) => {
bytesRead += chunk.length;
chunksReceived++;
});

stream.on("end", () => {
const duration = performance.now() - startTime;

console.log(`\nBuffer: ${bufferSize / 1024}KB`);
console.log(`Time: ${duration.toFixed(2)}ms`);
console.log(`Chunks: ${chunksReceived}`);
console.log(
`Throughput: ${(bytesRead / duration / 1024).toFixed(2)} MB/s`
);

resolve();
});
});
}

// Test different sizes
async function runPerformanceTests() {
const file = "test-file-100mb.txt";

await testBufferSize(file, 16 * 1024); // 16 KB
await testBufferSize(file, 64 * 1024); // 64 KB (default)
await testBufferSize(file, 256 * 1024); // 256 KB
await testBufferSize(file, 1024 * 1024); // 1 MB
}

runPerformanceTests();

Results (typical):

Buffer SizeChunksSpeedMemoryBest For
16 KB6,400SlowerMinimalEmbedded systems
64 KB1,600GoodLowGeneral purpose
256 KB400FasterModerateLarge files
1 MB100FastestHigherMaximum throughput

Recommendation: Start with 64KB (default), profile your application, then adjust if needed.

Summary: Key Takeaways

Manual Reading:

  • ✅ Use 'readable' event for precise control
  • ✅ Call stream.read(size) to pull specific amounts
  • ✅ Best for binary formats and custom protocols
  • ✅ More complex but more powerful

Creating Custom Streams:

  • ✅ Extend Readable class
  • ✅ Implement _read() method
  • ✅ Use this.push(data) to add data
  • ✅ Push null to signal end
  • ✅ Respect backpressure (when push() returns false)

Modern Approaches:

  • ✅ Use Readable.from() for simple streams
  • ✅ Use async iterators for clean code
  • ✅ Use pipeline() instead of .pipe()
  • ✅ Leverage TypeScript for type safety

Performance:

  • ✅ Adjust highWaterMark based on use case
  • ✅ Use object mode for structured data
  • ✅ Profile before optimizing
  • ✅ Remember: premature optimization is evil

What's Next?

Continue your stream mastery:

  • Transform Streams - Modify data as it flows
  • Writable Streams - Efficiently write data
  • Duplex Streams - Read and write simultaneously
  • Stream Error Handling - Advanced error strategies

You now have deep understanding of readable streams - from basic usage to creating custom implementations. This knowledge empowers you to build efficient, scalable data processing applications!

Version Information

Tested with:

  • Node.js: v18.x, v20.x, v22.x
  • TypeScript: v5.x