Advanced Stream Redirection: Building Data Processing Pipelines
In the previous article, we discovered how standard streams (stdin, stdout, stderr) let processes communicate. Now we're ready for the exciting part: learning how to orchestrate multiple programs working together through sophisticated stream redirection.
Have you ever wondered how developers build command-line tools that seamlessly work together, passing data through chains of transformations? Or how to redirect input and output in complex ways to build automated data processing workflows? Let's explore these powerful techniques.
Quick Reference
When to use: Building data pipelines, automating workflows, processing large datasets, creating production-ready CLI tools
Basic patterns:
# Combine stdout and stderr
command > output.txt 2>&1
# Split outputs
command > output.txt 2> errors.txt
# Chain multiple processes
cat data.txt | filter | transform | output
Common use cases:
- Processing log files through multiple stages
- Automated data transformation workflows
- Server-side data pipelines
- Development and deployment automation
What You Need to Know First
Required reading:
- Standard Streams in Node.js - You must understand stdin, stdout, and stderr before learning advanced redirection
Helpful background:
- Basic terminal/command-line usage
- Node.js fundamentals (running scripts, installing packages)
- File system operations (reading/writing files)
What We'll Cover in This Article
By the end of this guide, you'll master:
- Complex stream redirection patterns and operators
- Building multi-stage data processing pipelines
- Programmatic stream redirection in Node.js
- Handling edge cases and errors in pipelines
- Real-world pipeline architectures
- Performance considerations for large-scale processing
What We'll Explain Along the Way
We'll explore these concepts with detailed examples:
- File descriptor manipulation (what
2>&1really means) - Process substitution (advanced bash technique)
- Backpressure in stream processing
- Error propagation in pipelines
- Resource cleanup and memory management
Understanding Redirection Operators in Depth
Let's dig deeper into redirection operators and discover how they really work at the operating system level.
The > Operator: Redirecting stdout
When you write:
$ node script.ts > output.txt
Here's what happens behind the scenes:
- Shell starts your Node.js process with three streams
- Before starting, the shell closes stdout (file descriptor 1)
- Opens
output.txtand assigns it to file descriptor 1 - Now stdout points to the file instead of the terminal
- Your program runs normally, writing to stdout
- All stdout output flows into the file
Your program doesn't know it's writing to a file—it still just writes to stdout. The redirection happens transparently.
Visual representation:
Before: process → stdout (fd 1) → terminal
After: process → stdout (fd 1) → output.txt
The >> Operator: Append Instead of Overwrite
$ node script.ts >> output.txt
This works exactly like >, except the file is opened in append mode. New output adds to the end rather than replacing the file's contents.
When to use each:
- Use
>when starting fresh (log rotation, new analysis) - Use
>>when accumulating data (continuous logging, aggregating results)
# Day 1: Start fresh log
$ node monitor.ts > system.log
# Day 2-30: Append to existing log
$ node monitor.ts >> system.log
The 2> Operator: Redirecting stderr Specifically
Remember, 2 is the file descriptor for stderr. So 2> says "redirect file descriptor 2."
$ node script.ts 2> errors.txt
This redirects only stderr to the file. stdout still goes to the terminal.
Practical example:
// mixed-output.ts
// Demonstrates: Separate stdout and stderr streams
// Normal output
console.log("Processing record 1");
console.log("Processing record 2");
// Error output
process.stderr.write("Warning: Low memory\n");
// More normal output
console.log("Processing record 3");
Run it with separation:
$ node mixed-output.ts > records.txt 2> warnings.txt
Now check the files:
$ cat records.txt
Processing record 1
Processing record 2
Processing record 3
$ cat warnings.txt
Warning: Low memory
Perfect separation! This pattern is incredibly useful for production systems where you want to:
- Save results in one location
- Save errors/warnings in another
- Monitor each independently
The 2>&1 Pattern: Merging stderr into stdout
This is one of the most powerful—and initially confusing—redirection patterns. Let's decode it step by step.
$ node script.ts > output.txt 2>&1
What 2>&1 means literally:
2>- redirect stderr (file descriptor 2)&1- to wherever file descriptor 1 is currently pointing
The order matters! Let's see why:
# ✅ Correct: stdout redirected first, then stderr follows
$ node script.ts > output.txt 2>&1
Step 1: > output.txt # stdout (fd 1) now points to output.txt
Step 2: 2>&1 # stderr (fd 2) now points to where fd 1 points (output.txt)
Result: Both go to output.txt
# ❌ Different behavior: stderr redirected first
$ node script.ts 2>&1 > output.txt
Step 1: 2>&1 # stderr (fd 2) now points to where fd 1 points (terminal)
Step 2: > output.txt # stdout (fd 1) now points to output.txt
Result: stdout to file, stderr to terminal (not what we wanted!)
Visual comparison:
Correct order (> file 2>&1):
stdout → output.txt
stderr → output.txt
Wrong order (2>&1 > file):
stdout → output.txt
stderr → terminal
Why this pattern is useful:
// production-script.ts
// In production, you want complete logs with timestamps
console.log("[INFO] Starting process");
process.stderr.write("[ERROR] Configuration missing\n");
console.log("[INFO] Using defaults");
Capture everything:
$ node production-script.ts > complete.log 2>&1
Now complete.log contains all output in chronological order:
[INFO] Starting process
[ERROR] Configuration missing
[INFO] Using defaults
The &> Shortcut (Bash Only)
In Bash, you can use a shortcut:
$ node script.ts &> output.txt
This is equivalent to:
$ node script.ts > output.txt 2>&1
Note: The &> operator doesn't exist in all shells (it's Bash-specific). The > file 2>&1 pattern works everywhere and is more portable.
The < Operator: Input Redirection
We can also redirect input from files:
$ node process.ts < input.txt
This sends the file's contents to your program's stdin.
Example program:
// process.ts
// Purpose: Process data from stdin
// Works with: keyboard input, file redirection, or pipes
let totalLines = 0;
process.stdin.on("data", (chunk: Buffer) => {
const text = chunk.toString();
const lines = text.split("\n").filter((line) => line.trim());
totalLines += lines.length;
});
process.stdin.on("end", () => {
console.log(`Processed ${totalLines} lines`);
});
Now you can use it multiple ways:
# From a file
$ node process.ts < data.txt
Processed 1000 lines
# From keyboard (type Ctrl+D when done)
$ node process.ts
[type your data]
Processed 5 lines
# From another command
$ cat data.txt | node process.ts
Processed 1000 lines
Building Complex Data Pipelines
Now let's explore the real power: chaining multiple processes together through pipes. Each program does one thing well, and together they accomplish complex tasks.
The Pipe Operator: Connecting Processes
The pipe operator | connects the stdout of one process to the stdin of the next:
$ command1 | command2 | command3
Data flow:
command1 → stdout → stdin → command2 → stdout → stdin → command3 → stdout → terminal
Real-World Pipeline Example: Log Analysis
Let's build a practical log analysis pipeline step by step.
Scenario: You have a web server log file and need to:
- Extract lines containing errors
- Count how many errors occurred
- Find the most common error messages
- Save results to a report
The data (server.log):
2024-11-05 10:23:45 INFO User logged in
2024-11-05 10:24:12 ERROR Database connection failed
2024-11-05 10:24:15 INFO Request processed
2024-11-05 10:25:03 ERROR Database connection failed
2024-11-05 10:26:18 ERROR File not found: config.json
2024-11-05 10:27:22 INFO User logged out
2024-11-05 10:28:45 ERROR Database connection failed
Step 1: Extract error lines
$ cat server.log | grep "ERROR"
2024-11-05 10:24:12 ERROR Database connection failed
2024-11-05 10:25:03 ERROR Database connection failed
2024-11-05 10:26:18 ERROR File not found: config.json
2024-11-05 10:28:45 ERROR Database connection failed
Step 2: Extract just the error messages
$ cat server.log | grep "ERROR" | cut -d' ' -f4-
Database connection failed
Database connection failed
File not found: config.json
Database connection failed
The cut command splits each line by spaces (-d' ') and takes fields 4 onwards (-f4-).
Step 3: Sort error messages together
$ cat server.log | grep "ERROR" | cut -d' ' -f4- | sort
Database connection failed
Database connection failed
Database connection failed
File not found: config.json
Step 4: Count occurrences of each error
$ cat server.log | grep "ERROR" | cut -d' ' -f4- | sort | uniq -c
3 Database connection failed
1 File not found: config.json
Step 5: Sort by frequency (most common first)
$ cat server.log | grep "ERROR" | cut -d' ' -f4- | sort | uniq -c | sort -rn
3 Database connection failed
1 File not found: config.json
Step 6: Save to a report
$ cat server.log | grep "ERROR" | cut -d' ' -f4- | sort | uniq -c | sort -rn > error-report.txt
All in one command:
$ cat server.log | grep "ERROR" | cut -d' ' -f4- | sort | uniq -c | sort -rn > error-report.txt
This is the power of pipes! Each command is simple, but together they perform sophisticated analysis.
Building Node.js Pipeline Components
Let's create Node.js programs designed to work in pipelines. The key principles:
- Read from stdin (don't hardcode input files)
- Write data to stdout (for the next stage)
- Write messages to stderr (so they don't contaminate data)
- Exit gracefully when input ends
Example 1: JSON Formatter
// json-format.ts
// Purpose: Format JSON data for readability
// Usage: cat data.json | node json-format.ts
let inputData = "";
process.stdin.on("data", (chunk: Buffer) => {
inputData += chunk.toString();
});
process.stdin.on("end", () => {
try {
// Parse JSON
const parsed = JSON.parse(inputData);
// Format with 2-space indentation
const formatted = JSON.stringify(parsed, null, 2);
// Send to stdout for next stage or terminal
process.stdout.write(formatted + "\n");
// Log success to stderr (visible but doesn't affect data)
process.stderr.write(`✓ Formatted ${Object.keys(parsed).length} keys\n`);
} catch (error) {
// Errors go to stderr
process.stderr.write(`✗ Invalid JSON: ${error.message}\n`);
process.exit(1);
}
});
Example 2: CSV to JSON Converter
// csv-to-json.ts
// Purpose: Convert CSV to JSON format
// Usage: cat data.csv | node csv-to-json.ts
let headers: string[] = [];
const rows: Record<string, string>[] = [];
let isFirstLine = true;
process.stdin.on("data", (chunk: Buffer) => {
const lines = chunk.toString().split("\n");
lines.forEach((line) => {
if (!line.trim()) return;
if (isFirstLine) {
// First line is headers
headers = line.split(",").map((h) => h.trim());
isFirstLine = false;
process.stderr.write(`Found headers: ${headers.join(", ")}\n`);
} else {
// Data rows
const values = line.split(",").map((v) => v.trim());
const row: Record<string, string> = {};
headers.forEach((header, index) => {
row[header] = values[index] || "";
});
rows.push(row);
}
});
});
process.stdin.on("end", () => {
// Output JSON to stdout
process.stdout.write(JSON.stringify(rows, null, 2) + "\n");
// Stats to stderr
process.stderr.write(`✓ Converted ${rows.length} rows\n`);
});
Example 3: Data Filter
// filter-large.ts
// Purpose: Filter JSON objects by size
// Usage: cat data.json | node csv-to-json.ts | node filter-large.ts
let inputData = "";
process.stdin.on("data", (chunk: Buffer) => {
inputData += chunk.toString();
});
process.stdin.on("end", () => {
try {
const data = JSON.parse(inputData);
// Filter: keep only items with size > 100
const filtered = data.filter((item: any) => {
return item.size && parseInt(item.size) > 100;
});
// Output filtered data
process.stdout.write(JSON.stringify(filtered, null, 2) + "\n");
// Stats to stderr
process.stderr.write(
`Filtered ${data.length} → ${filtered.length} items\n`
);
} catch (error) {
process.stderr.write(`Error: ${error.message}\n`);
process.exit(1);
}
});
Building a complete pipeline:
$ cat data.csv | node csv-to-json.ts | node filter-large.ts | node json-format.ts > results.json
Found headers: name, size, date
✓ Converted 150 rows
Filtered 150 → 23 items
✓ Formatted 23 keys
Notice how status messages appear on your terminal (stderr) while data flows through the pipeline to the output file (stdout).
Advanced Redirection Techniques
Splitting Output to Multiple Destinations
Sometimes you want to see output on screen AND save it to a file. The tee command does this:
$ node script.ts | tee output.txt
tee reads stdin, writes to stdout (so you see it), and also writes to the file.
Real-world example:
# Run tests, see results, and save log
$ npm test | tee test-results.txt
# Watch logs while saving them
$ node server.ts | tee server.log
Process Substitution (Advanced Bash)
Process substitution lets you use command output as if it were a file.
$ diff <(command1) <(command2)
The <(command) syntax runs the command and makes its output available as a temporary file.
Example: Compare outputs of two commands
$ diff <(node version1.ts) <(node version2.ts)
This compares the output of two different script versions without creating temporary files manually.
Named Pipes (FIFOs)
Named pipes let processes communicate through the filesystem:
# Create a named pipe
$ mkfifo mypipe
# Terminal 1: Write to the pipe
$ node producer.ts > mypipe
# Terminal 2: Read from the pipe
$ node consumer.ts < mypipe
The producer blocks until the consumer starts reading, creating synchronization between processes.
Use case: Streaming data between long-running processes without filling disk space with intermediate files.
Programmatic Stream Redirection in Node.js
Sometimes you need to redirect streams from within your Node.js program. Let's explore how to do this programmatically.
Redirecting stdout to a File
// redirect-stdout.ts
// Purpose: Redirect stdout to a file programmatically
import fs from "fs";
// Create a writable stream to a file
const logFile = fs.createWriteStream("output.log");
// Save original stdout
const originalStdout = process.stdout.write;
// Override stdout.write to go to file
process.stdout.write = function (chunk: any): boolean {
return logFile.write(chunk);
};
// Now all console.log goes to the file
console.log("This goes to output.log");
console.log("So does this");
// Restore original stdout if needed
process.stdout.write = originalStdout;
console.log("This goes back to terminal");
Capturing Child Process Output
// capture-output.ts
// Purpose: Run a command and capture its output
import { spawn } from "child_process";
function runCommand(command: string, args: string[]): Promise<string> {
return new Promise((resolve, reject) => {
const child = spawn(command, args);
let output = "";
let errorOutput = "";
// Capture stdout
child.stdout.on("data", (chunk: Buffer) => {
output += chunk.toString();
});
// Capture stderr
child.stderr.on("data", (chunk: Buffer) => {
errorOutput += chunk.toString();
});
// Handle process completion
child.on("close", (code: number) => {
if (code === 0) {
resolve(output);
} else {
reject(new Error(`Command failed with code ${code}: ${errorOutput}`));
}
});
});
}
// Usage
async function main() {
try {
const output = await runCommand("ls", ["-la"]);
console.log("Command output:", output);
} catch (error) {
console.error("Command failed:", error);
}
}
main();
Building a Custom Pipeline
// pipeline-builder.ts
// Purpose: Chain Node.js scripts together programmatically
import { spawn } from "child_process";
interface PipelineStage {
command: string;
args: string[];
}
function buildPipeline(stages: PipelineStage[]): Promise<string> {
return new Promise((resolve, reject) => {
if (stages.length === 0) {
return resolve("");
}
// Spawn all processes
const processes = stages.map((stage) => spawn(stage.command, stage.args));
// Connect pipes: stdout of each to stdin of next
for (let i = 0; i < processes.length - 1; i++) {
processes[i].stdout.pipe(processes[i + 1].stdin);
}
// Capture final output
let output = "";
const lastProcess = processes[processes.length - 1];
lastProcess.stdout.on("data", (chunk: Buffer) => {
output += chunk.toString();
});
// Handle errors
processes.forEach((proc, index) => {
proc.on("error", (error) => {
reject(new Error(`Stage ${index + 1} failed: ${error.message}`));
});
});
// Handle completion
lastProcess.on("close", (code: number) => {
if (code === 0) {
resolve(output);
} else {
reject(new Error(`Pipeline failed with code ${code}`));
}
});
});
}
// Usage example
async function analyzeLogs() {
const pipeline: PipelineStage[] = [
{ command: "cat", args: ["server.log"] },
{ command: "grep", args: ["ERROR"] },
{ command: "sort", args: [] },
{ command: "uniq", args: ["-c"] },
];
try {
const result = await buildPipeline(pipeline);
console.log("Analysis complete:");
console.log(result);
} catch (error) {
console.error("Pipeline failed:", error);
}
}
analyzeLogs();
Performance Considerations
Memory Usage: Streams vs Buffers
When processing large files, stream-based approaches use constant memory while buffer-based approaches scale with file size.
Memory comparison:
// ❌ Bad: Loads entire file into memory
import fs from "fs";
// For a 1GB file, this uses ~1GB of memory
const content = fs.readFileSync("large-file.txt", "utf8");
const lines = content.split("\n");
console.log(`Lines: ${lines.length}`);
// ✅ Good: Streams data in chunks
import fs from "fs";
import readline from "readline";
// For a 1GB file, this uses ~10MB of memory
const stream = fs.createReadStream("large-file.txt");
const rl = readline.createInterface({ input: stream });
let lineCount = 0;
rl.on("line", () => lineCount++);
rl.on("close", () => console.log(`Lines: ${lineCount}`));
Performance table:
| File Size | Buffer Approach | Stream Approach |
|---|---|---|
| 10 MB | 10 MB memory | 5 MB memory |
| 100 MB | 100 MB memory | 5 MB memory |
| 1 GB | 1 GB memory | 5 MB memory |
| 10 GB | Out of memory ❌ | 5 MB memory ✅ |
Backpressure: Handling Flow Control
When piping streams, the consumer might be slower than the producer. This creates backpressure—data accumulating in memory buffers.
Without backpressure handling:
// ❌ Can cause memory issues with fast sources
const source = fs.createReadStream("huge-file.txt");
const destination = fs.createWriteStream("output.txt");
source.on("data", (chunk) => {
destination.write(chunk); // Might not be ready!
});
With proper backpressure handling:
// ✅ Respects flow control
const source = fs.createReadStream("huge-file.txt");
const destination = fs.createWriteStream("output.txt");
source.on("data", (chunk) => {
const canContinue = destination.write(chunk);
if (!canContinue) {
// Destination buffer is full, pause source
source.pause();
// Resume when destination is ready
destination.once("drain", () => {
source.resume();
});
}
});
Best approach: Use pipe():
// ✅ Best: pipe() handles backpressure automatically
const source = fs.createReadStream("huge-file.txt");
const destination = fs.createWriteStream("output.txt");
source.pipe(destination);
The pipe() method automatically:
- Pauses the source when destination is overwhelmed
- Resumes when destination is ready
- Handles errors properly
- Cleans up resources
Common Misconceptions
❌ Misconception: Pipes work like file copying
Reality: Pipes stream data continuously without creating intermediate files.
Why this matters: Pipes can process files larger than your available disk space because they don't store intermediate results.
Example:
# ❌ Thinking: This creates temporary files
$ cat huge.log | grep "ERROR" | sort > results.txt
# ✅ Reality: Data flows through memory only
# No intermediate files created!
If huge.log is 50GB and you only have 20GB free disk space, this still works because data flows through memory in chunks.
❌ Misconception: Redirecting stdout also redirects console.error()
Reality: console.error() writes to stderr, not stdout, so it isn't affected by > redirection.
Example:
// mixed-logs.ts
console.log("Normal message");
console.error("Error message");
console.log("Another normal message");
$ node mixed-logs.ts > output.txt
Error message # Appears on screen (stderr not redirected)
Check the file:
$ cat output.txt
Normal message
Another normal message
❌ Misconception: Pipe failures are silent
Reality: If any stage in a pipeline fails, you need to handle it explicitly or it might go unnoticed.
Example:
// failing-stage.ts
process.stdin.on("data", (chunk) => {
// This stage will crash
throw new Error("Oops!");
});
$ echo "test" | node failing-stage.ts | cat
# The error might not propagate!
Solution: Use proper error handling:
process.stdin.on("data", (chunk) => {
try {
// Your processing
} catch (error) {
process.stderr.write(`Error: ${error.message}\n`);
process.exit(1); // Exit with error code
}
});
Troubleshooting Pipeline Issues
Problem: Pipeline produces no output
Symptoms: Command runs but produces empty output.
Common causes:
- A stage in the pipeline filtered everything out (80%)
- Output is buffered and not flushed (15%)
- Wrong redirection order (5%)
Diagnostic steps:
# Step 1: Test each stage individually
$ cat data.txt # Does source have data?
$ cat data.txt | grep "pattern" # Does filter match anything?
$ cat data.txt | grep "pattern" | sort # Does sort work?
# Step 2: Check for buffering
$ node script.ts | cat # Force flush with cat
# Step 3: Verify redirection order
$ node script.ts > output.txt 2>&1 # Correct order
Solution: Check each pipeline stage independently to find where data disappears.
Problem: "Broken pipe" error
Symptoms: Error message about broken pipe, pipeline terminates early.
Common cause: A process in the pipeline exits before reading all input from the previous stage.
Example:
$ cat huge-file.txt | head -n 10
cat: write error: Broken pipe
This is actually normal! head reads 10 lines and exits, causing cat to get a broken pipe when trying to write more.
Solution: This is usually harmless. To suppress:
$ cat huge-file.txt 2>/dev/null | head -n 10
Problem: Pipeline hangs indefinitely
Symptoms: Command starts but never completes.
Common causes:
- Deadlock: process waiting for input that never comes (70%)
- Infinite loop in one stage (20%)
- Waiting for stdin but should use files (10%)
Diagnostic steps:
// Add timeouts to detect hangs
setTimeout(() => {
process.stderr.write("Still waiting for input...\n");
}, 5000);
// Log when data arrives
process.stdin.on("data", (chunk) => {
process.stderr.write(`Received ${chunk.length} bytes\n`);
});
Solution: Check if your program expects input from stdin. If running interactively, consider adding timeout or help message.
Production-Ready Pipeline Pattern
Here's a template for building robust, production-ready pipeline components:
// production-pipeline-stage.ts
// Purpose: Template for building reliable pipeline components
import { Transform } from "stream";
class ProductionTransform extends Transform {
private processed = 0;
private errors = 0;
constructor() {
super({ objectMode: true });
}
_transform(
chunk: any,
encoding: BufferEncoding,
callback: (error?: Error | null, data?: any) => void
): void {
try {
// Your transformation logic here
const result = this.processChunk(chunk);
this.processed++;
// Send result to next stage
callback(null, result);
} catch (error) {
this.errors++;
// Log error to stderr (doesn't break pipeline)
process.stderr.write(
`Error processing chunk ${this.processed}: ${error.message}\n`
);
// Skip this chunk and continue
callback();
}
}
_flush(callback: (error?: Error | null) => void): void {
// Called when input stream ends
process.stderr.write(
`Pipeline stage complete: ${this.processed} processed, ${this.errors} errors\n`
);
callback();
}
private processChunk(chunk: any): any {
// Implement your transformation here
return chunk;
}
}
// Usage
const transformer = new ProductionTransform();
process.stdin
.pipe(transformer)
.pipe(process.stdout)
.on("error", (error) => {
process.stderr.write(`Pipeline error: ${error.message}\n`);
process.exit(1);
})
.on("finish", () => {
process.stderr.write("Pipeline completed successfully\n");
});
Check Your Understanding
Quick Quiz
-
What does
2>&1do and why does order matter?Show Answer
2>&1redirects stderr (fd 2) to wherever stdout (fd 1) is currently pointing.Order matters because redirections are processed left to right:
# Correct: Both go to file
command > file.txt 2>&1
# Wrong: Only stdout to file, stderr to terminal
command 2>&1 > file.txtIn the correct version,
>redirects stdout to file first, then2>&1sends stderr to that same file. In the wrong version,2>&1duplicates stdout (which still points to terminal), then>redirects only stdout to the file. -
Why would you use
teein a pipeline?Show Answer
teelets you see output on your terminal AND save it to a file simultaneously.# Without tee: Only see results, not saved
npm test
# With tee: See results AND save to file
npm test | tee test-results.txtThis is invaluable for long-running processes where you want to monitor progress in real-time while also keeping a permanent log.
-
When should you use
pipe()vs manual data handling?Show Answer
Use
pipe()when:- Simply transferring data from source to destination
- Need automatic backpressure handling
- Want error propagation and cleanup
Use manual handling when:
- Need to transform or inspect data as it flows
- Building complex branching logic
- Aggregating data across chunks
// Use pipe(): Simple transfer
source.pipe(destination);
// Manual handling: Need to transform
source.on("data", (chunk) => {
const transformed = processChunk(chunk);
destination.write(transformed);
});
Hands-On Exercise
Challenge: Build a pipeline component that counts word frequency from stdin and outputs the top 10 most common words as JSON.
Requirements:
- Read from stdin
- Count word occurrences (case-insensitive)
- Output top 10 as JSON to stdout
- Log progress to stderr
Starter Code:
// word-frequency.ts
// TODO: Implement word frequency counter
let wordCounts: Record<string, number> = {};
process.stdin.on("data", (chunk: Buffer) => {
// Your code here
});
process.stdin.on("end", () => {
// Output top 10 as JSON
});
Show Solution
// word-frequency.ts
// Solution: Word frequency counter for pipelines
interface WordCount {
word: string;
count: number;
}
let wordCounts: Record<string, number> = {};
let totalWords = 0;
process.stdin.on("data", (chunk: Buffer) => {
const text = chunk.toString();
// Extract words (alphanumeric only)
const words = text.toLowerCase().match(/\b[a-z0-9]+\b/g) || [];
// Count occurrences
words.forEach((word) => {
wordCounts[word] = (wordCounts[word] || 0) + 1;
totalWords++;
});
// Log progress to stderr
process.stderr.write(`Processed ${totalWords} words so far...\n`);
});
process.stdin.on("end", () => {
// Convert to array and sort by count
const sorted: WordCount[] = Object.entries(wordCounts)
.map(([word, count]) => ({ word, count }))
.sort((a, b) => b.count - a.count)
.slice(0, 10);
// Output JSON to stdout
const result = {
total_words: totalWords,
unique_words: Object.keys(wordCounts).length,
top_10: sorted,
};
process.stdout.write(JSON.stringify(result, null, 2) + "\n");
// Final stats to stderr
process.stderr.write(
`✓ Analysis complete: ${totalWords} total, ` +
`${Object.keys(wordCounts).length} unique\n`
);
});
Test it:
# From a file
$ cat article.txt | node word-frequency.ts
# From multiple files
$ cat *.txt | node word-frequency.ts
# In a pipeline
$ cat article.txt | node word-frequency.ts | node json-formatter.ts > results.json
Why this solution works:
- Streams data incrementally - doesn't load entire file into memory
- Logs to stderr - progress messages don't contaminate JSON output
- Outputs valid JSON to stdout - can be piped to other tools
- Handles edge cases - filters out non-words, handles empty input
- Provides useful metadata - includes total and unique word counts
Real-World Pipeline Architectures
Let's explore how production systems use pipelines to process data at scale.
Architecture 1: ETL (Extract, Transform, Load) Pipeline
Scenario: Processing daily sales data from CSV to database.
# Pipeline stages:
# 1. Extract: Read CSV from S3 or file system
# 2. Transform: Clean, validate, enrich data
# 3. Load: Insert into database
$ cat sales-data.csv \
| node extract-csv.ts \
| node validate-data.ts \
| node enrich-with-metadata.ts \
| node load-to-database.ts \
> processed.log 2> errors.log
Each stage is independent:
// validate-data.ts
// Purpose: Validate and filter invalid records
process.stdin.on("data", (chunk: Buffer) => {
const records = JSON.parse(chunk.toString());
const valid = records.filter((record: any) => {
// Validation rules
if (!record.date || !record.amount) {
process.stderr.write(`Invalid record: ${JSON.stringify(record)}\n`);
return false;
}
return true;
});
process.stdout.write(JSON.stringify(valid) + "\n");
});
Architecture 2: Log Aggregation Pipeline
Scenario: Collecting logs from multiple servers and analyzing patterns.
# Collect from multiple sources
$ (cat server1.log & cat server2.log & cat server3.log) \
| node parse-logs.ts \
| node filter-errors.ts \
| node aggregate-by-hour.ts \
| node generate-report.ts \
> daily-report.html
Parallel input, serial processing:
// aggregate-by-hour.ts
// Purpose: Group log entries by hour
const hourlyBuckets: Record<string, any[]> = {};
process.stdin.on("data", (chunk: Buffer) => {
const entries = JSON.parse(chunk.toString());
entries.forEach((entry: any) => {
const hour = new Date(entry.timestamp).getHours();
const key = `hour-${hour}`;
if (!hourlyBuckets[key]) {
hourlyBuckets[key] = [];
}
hourlyBuckets[key].push(entry);
});
});
process.stdin.on("end", () => {
// Output aggregated data
const summary = Object.entries(hourlyBuckets).map(([hour, entries]) => ({
hour,
count: entries.length,
errors: entries.filter((e: any) => e.level === "ERROR").length,
}));
process.stdout.write(JSON.stringify(summary, null, 2) + "\n");
});
Architecture 3: Real-Time Processing Pipeline
Scenario: Processing streaming data from a message queue or API.
// real-time-pipeline.ts
// Purpose: Process continuous stream of events
import { Transform } from "stream";
// Stage 1: Parse events
class EventParser extends Transform {
_transform(chunk: any, encoding: string, callback: Function) {
try {
const events = chunk.toString().split("\n").filter(Boolean);
events.forEach((event) => {
this.push(JSON.parse(event));
});
callback();
} catch (error) {
callback(error);
}
}
}
// Stage 2: Enrich with metadata
class EventEnricher extends Transform {
_transform(event: any, encoding: string, callback: Function) {
event.processedAt = new Date().toISOString();
event.serverId = process.env.SERVER_ID || "unknown";
this.push(event);
callback();
}
}
// Stage 3: Filter important events
class EventFilter extends Transform {
_transform(event: any, encoding: string, callback: Function) {
if (event.priority === "high" || event.type === "error") {
this.push(event);
}
callback();
}
}
// Stage 4: Format output
class EventFormatter extends Transform {
_transform(event: any, encoding: string, callback: Function) {
const formatted = JSON.stringify(event) + "\n";
this.push(formatted);
callback();
}
}
// Build the pipeline
process.stdin
.pipe(new EventParser())
.pipe(new EventEnricher())
.pipe(new EventFilter())
.pipe(new EventFormatter())
.pipe(process.stdout);
Usage:
# Process from a message queue
$ kafka-consumer --topic events | node real-time-pipeline.ts > filtered-events.log
# Process from API stream
$ curl -N https://api.example.com/stream | node real-time-pipeline.ts
Architecture 4: Distributed Processing
Scenario: Splitting work across multiple processes for parallel processing.
# Split input into chunks, process in parallel, merge results
$ cat huge-dataset.csv \
| split -l 10000 --filter='node process-chunk.ts' \
| node merge-results.ts \
> final-output.json
Process chunk component:
// process-chunk.ts
// Purpose: Process a chunk of data independently
process.stdin.on("data", (chunk: Buffer) => {
const lines = chunk.toString().split("\n");
const processed = lines.map((line) => {
// Heavy processing here
return expensiveOperation(line);
});
process.stdout.write(JSON.stringify(processed) + "\n");
});
function expensiveOperation(data: string): any {
// Simulate CPU-intensive work
// In real world: ML inference, image processing, etc.
return { processed: data.toUpperCase() };
}
Performance Optimization Tips
1. Use Stream Transforms for Heavy Processing
Instead of accumulating data, process it as it streams:
// ❌ Bad: Accumulate then process
let allData: any[] = [];
process.stdin.on("data", (chunk) => {
allData.push(...JSON.parse(chunk.toString()));
});
process.stdin.on("end", () => {
const processed = allData.map(heavyOperation);
console.log(JSON.stringify(processed));
});
// ✅ Good: Stream processing
import { Transform } from "stream";
const processor = new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
const result = heavyOperation(chunk);
callback(null, result);
},
});
process.stdin.pipe(processor).pipe(process.stdout);
2. Optimize Buffer Sizes
Adjust buffer sizes based on your data:
// Default buffer size: 16KB
const small = fs.createReadStream("file.txt");
// Larger buffer for throughput
const large = fs.createReadStream("file.txt", {
highWaterMark: 64 * 1024, // 64KB
});
// Smaller buffer for memory constraints
const tiny = fs.createReadStream("file.txt", {
highWaterMark: 4 * 1024, // 4KB
});
Guidelines:
- Large files, fast disks: 64KB - 256KB buffers
- Network streams: 16KB - 32KB buffers
- Memory constrained: 4KB - 8KB buffers
3. Parallelize Independent Operations
If stages don't depend on each other, run them in parallel:
# Sequential: Slow
$ cat data.txt | process1 | process2 | process3
# Parallel: Fast (if independent)
$ cat data.txt | tee >(process1 > out1) >(process2 > out2) >(process3 > out3)
4. Use Compression for Network Transfers
When piping over networks, compress data:
# Without compression
$ cat large-file.txt | ssh remote-server 'cat > file.txt'
# With compression (much faster over network)
$ cat large-file.txt | gzip | ssh remote-server 'gunzip > file.txt'
Summary: Key Takeaways
Let's recap the advanced redirection techniques we've explored:
Redirection Operators:
>redirects stdout (overwrites file)>>redirects stdout (appends to file)2>redirects stderr only2>&1merges stderr into stdout (order matters!)<redirects file into stdin|pipes stdout to stdin of next command
Pipeline Principles:
- Each stage reads from stdin, writes to stdout
- Status messages go to stderr (not stdout)
- Exit with proper error codes
- Handle backpressure in production code
- Use
pipe()for automatic resource management
Performance Guidelines:
- Stream data instead of buffering when possible
- Optimize buffer sizes for your use case
- Parallelize independent operations
- Monitor memory usage in long-running pipelines
Production Best Practices:
- Log progress to stderr
- Handle errors gracefully without breaking pipeline
- Provide meaningful exit codes
- Clean up resources properly
- Test each pipeline stage independently
What's Next?
Now that you've mastered stream redirection and pipelines, you're ready to explore:
Advanced Node.js Streams:
- Transform streams for custom data processing
- Duplex streams for bidirectional communication
- Object mode streams for structured data
- Stream error handling and recovery strategies
Related Topics:
- Building CLI tools with libraries like Commander.js
- Process management and monitoring
- Worker threads for CPU-intensive pipeline stages
- Message queues for distributed pipelines
Practical Projects:
- Build a log analysis toolkit
- Create an ETL pipeline for data migration
- Develop a real-time data processing system
- Design a distributed computation framework
You now understand how to orchestrate complex data flows using stream redirection. These patterns form the foundation of Unix-style programming: small, focused tools that work together through standard interfaces. Whether you're processing gigabytes of logs, building data transformation pipelines, or creating developer tools, mastering streams gives you the power to handle data elegantly and efficiently.
Version Information
Tested with:
- Node.js: v18.x, v20.x, v22.x
- Bash: 5.x
- Operating Systems: Linux, macOS, Windows (WSL)
Platform Notes:
- Windows users: Use WSL (Windows Subsystem for Linux) for full bash redirection support
- PowerShell has different redirection syntax - these examples use bash/WSL
- All Node.js code works cross-platform
Stream redirection embodies the Unix philosophy at its finest: powerful, composable tools that do one thing well and work together seamlessly. Master these patterns, and you'll build systems that are elegant, efficient, and maintainable.