Stream Everything: Node.js Streams and Web Streams in Modern JS Apps
Stream Everything: Node.js Streams and Web Streams in Modern JS Apps
The default mental model for data handling is buffering: fetch the data, store it in memory, process it, send it. This works until it doesn't—until you're streaming gigabytes of files, piping LLM responses in real-time, or running on edge runtimes with memory limits measured in megabytes.
Streams are the alternative: process data as it flows, never holding more than a chunk at a time. Modern JavaScript has two streaming APIs—Node.js streams and Web Streams—and understanding both is essential for building performant applications that work everywhere from serverless functions to edge workers.
This is a deep dive into stream architecture, practical patterns for real applications, and how to rewire your thinking from buffered to streaming.
The Streaming Mental Model
┌─────────────────────────────────────────────────────────────────────────────┐
│ BUFFERED vs STREAMING │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Buffered (traditional) │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ Source ──────────────────────────────────────────────► Buffer │ │
│ │ (100MB file) Wait for complete... (100MB RAM) │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ Process │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ Response │ │
│ │ (100MB) │ │
│ │ │ │
│ │ Memory: O(n) where n = data size │ │
│ │ Time to first byte: After complete download │ │
│ │ Risk: OOM on large data │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ Streaming │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ Source ─┬─► Chunk ─► Process ─► Send ─┬─► Destination │ │
│ │ │ (64KB) │ │ │
│ │ │ │ │ │
│ │ ├─► Chunk ─► Process ─► Send ─┤ │ │
│ │ │ (64KB) │ │ │
│ │ │ │ │ │
│ │ └─► Chunk ─► Process ─► Send ─┘ │ │
│ │ (64KB) │ │
│ │ │ │
│ │ Memory: O(1) - only current chunk │ │
│ │ Time to first byte: Immediate │ │
│ │ Risk: None (constant memory) │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ When to stream: │
│ • File uploads/downloads │
│ • LLM/AI responses │
│ • Video/audio processing │
│ • Log processing │
│ • Database exports │
│ • Any data > available memory │
│ • When time-to-first-byte matters │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Web Streams API: The Modern Standard
Web Streams are the platform-native streaming API, supported in browsers, Deno, Cloudflare Workers, and Node.js 18+.
The Three Stream Types
// ReadableStream: Source of data
const readable = new ReadableStream({
start(controller) {
// Called once when stream is created
controller.enqueue('Hello');
controller.enqueue(' ');
controller.enqueue('World');
controller.close();
},
});
// WritableStream: Destination for data
const writable = new WritableStream({
write(chunk) {
console.log('Received:', chunk);
},
close() {
console.log('Stream closed');
},
});
// TransformStream: Process data in transit
const transform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
// Piping them together
await readable
.pipeThrough(transform)
.pipeTo(writable);
// Output:
// Received: HELLO
// Received:
// Received: WORLD
// Stream closed
ReadableStream Deep Dive
// Two modes: "default" (pull) and "byob" (bring your own buffer)
// Pull-based ReadableStream (most common)
function createCounterStream(max: number): ReadableStream<number> {
let current = 0;
return new ReadableStream({
// Called when consumer wants more data
pull(controller) {
if (current < max) {
controller.enqueue(current++);
} else {
controller.close();
}
},
// Optional: Called on cancel
cancel(reason) {
console.log('Stream cancelled:', reason);
},
});
}
// Push-based ReadableStream (for external sources)
function createEventStream(eventTarget: EventTarget): ReadableStream<Event> {
return new ReadableStream({
start(controller) {
const handler = (event: Event) => {
controller.enqueue(event);
};
eventTarget.addEventListener('data', handler);
// Return cleanup function
return () => {
eventTarget.removeEventListener('data', handler);
};
},
});
}
// With backpressure control
function createThrottledStream<T>(
source: AsyncIterable<T>,
highWaterMark = 10
): ReadableStream<T> {
const iterator = source[Symbol.asyncIterator]();
return new ReadableStream(
{
async pull(controller) {
const { value, done } = await iterator.next();
if (done) {
controller.close();
} else {
controller.enqueue(value);
}
},
},
// Queuing strategy
new CountQueuingStrategy({ highWaterMark })
);
}
WritableStream Deep Dive
// File writer with chunked uploads
function createUploadStream(uploadUrl: string): WritableStream<Uint8Array> {
let offset = 0;
return new WritableStream({
async write(chunk) {
await fetch(uploadUrl, {
method: 'PATCH',
headers: {
'Content-Range': `bytes ${offset}-${offset + chunk.length - 1}/*`,
},
body: chunk,
});
offset += chunk.length;
},
async close() {
// Finalize upload
await fetch(uploadUrl, {
method: 'POST',
headers: { 'X-Upload-Complete': 'true' },
});
},
abort(reason) {
console.error('Upload aborted:', reason);
// Cancel partial upload
fetch(uploadUrl, { method: 'DELETE' });
},
});
}
// With backpressure signaling
function createDatabaseWriter(): WritableStream<Record<string, unknown>> {
let batch: Record<string, unknown>[] = [];
const BATCH_SIZE = 100;
return new WritableStream(
{
async write(record) {
batch.push(record);
if (batch.length >= BATCH_SIZE) {
await db.insertMany(batch);
batch = [];
}
},
async close() {
if (batch.length > 0) {
await db.insertMany(batch);
}
},
},
// Size each record as 1 unit, buffer up to 1000
new CountQueuingStrategy({ highWaterMark: 1000 })
);
}
TransformStream: The Power Tool
// JSON Lines parser (ndjson)
function createJsonLinesParser(): TransformStream<string, object> {
let buffer = '';
return new TransformStream({
transform(chunk, controller) {
buffer += chunk;
const lines = buffer.split('\n');
buffer = lines.pop() || ''; // Keep incomplete line in buffer
for (const line of lines) {
if (line.trim()) {
try {
controller.enqueue(JSON.parse(line));
} catch (e) {
controller.error(new Error(`Invalid JSON: ${line}`));
}
}
}
},
flush(controller) {
// Process any remaining data
if (buffer.trim()) {
try {
controller.enqueue(JSON.parse(buffer));
} catch (e) {
controller.error(new Error(`Invalid JSON: ${buffer}`));
}
}
},
});
}
// Text decoder transform
function createTextDecoder(encoding = 'utf-8'): TransformStream<Uint8Array, string> {
const decoder = new TextDecoder(encoding);
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(decoder.decode(chunk, { stream: true }));
},
flush(controller) {
controller.enqueue(decoder.decode());
},
});
}
// Chunked progress reporter
function createProgressReporter(
onProgress: (bytes: number) => void
): TransformStream<Uint8Array, Uint8Array> {
let totalBytes = 0;
return new TransformStream({
transform(chunk, controller) {
totalBytes += chunk.length;
onProgress(totalBytes);
controller.enqueue(chunk);
},
});
}
// Composing transforms
const response = await fetch('/large-file.ndjson');
const objects: object[] = [];
await response.body!
.pipeThrough(createTextDecoder())
.pipeThrough(createJsonLinesParser())
.pipeTo(new WritableStream({
write(obj) {
objects.push(obj);
},
}));
Node.js Streams: The Original
Node.js streams predate Web Streams and have different APIs. Understanding both is essential since many Node.js libraries use native streams.
The Four Stream Types
import {
Readable,
Writable,
Transform,
Duplex,
pipeline,
} from 'stream';
import { promisify } from 'util';
const pipelineAsync = promisify(pipeline);
// Readable
const readable = new Readable({
read(size) {
// Push data or null to end
this.push('chunk of data');
this.push(null); // End stream
},
});
// Writable
const writable = new Writable({
write(chunk, encoding, callback) {
console.log('Received:', chunk.toString());
callback(); // Signal ready for next chunk
},
});
// Transform (Duplex that transforms)
const transform = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
},
});
// Duplex (independent read and write)
const duplex = new Duplex({
read(size) {
// Independent read logic
},
write(chunk, encoding, callback) {
// Independent write logic
callback();
},
});
// Pipeline with error handling
await pipelineAsync(readable, transform, writable);
Converting Between Node.js and Web Streams
import { Readable, Writable } from 'stream';
// Node.js Readable → Web ReadableStream
function nodeReadableToWeb(nodeReadable: Readable): ReadableStream {
return new ReadableStream({
start(controller) {
nodeReadable.on('data', (chunk) => {
controller.enqueue(chunk);
});
nodeReadable.on('end', () => {
controller.close();
});
nodeReadable.on('error', (err) => {
controller.error(err);
});
},
cancel() {
nodeReadable.destroy();
},
});
}
// Web ReadableStream → Node.js Readable
function webReadableToNode(webReadable: ReadableStream): Readable {
const reader = webReadable.getReader();
return new Readable({
async read() {
const { done, value } = await reader.read();
if (done) {
this.push(null);
} else {
this.push(value);
}
},
});
}
// Node.js 18+ has built-in conversion
import { Readable as NodeReadable } from 'stream';
// Node → Web
const webStream = NodeReadable.toWeb(nodeReadable);
// Web → Node
const nodeStream = NodeReadable.fromWeb(webReadableStream);
Streaming in Edge Runtimes
Edge runtimes (Cloudflare Workers, Vercel Edge, Deno Deploy) have strict constraints: limited memory, no Node.js APIs, execution time limits. Streams are essential.
Edge-Compatible Streaming Pattern
// Cloudflare Worker / Vercel Edge Function
export default {
async fetch(request: Request): Promise<Response> {
// Stream a large file without buffering
const fileResponse = await fetch('https://storage.example.com/large-file.csv');
// Transform on the fly
const transformedBody = fileResponse.body!
.pipeThrough(new TextDecoderStream())
.pipeThrough(createCsvToJsonTransform())
.pipeThrough(new TextEncoderStream());
return new Response(transformedBody, {
headers: {
'Content-Type': 'application/json',
'Transfer-Encoding': 'chunked',
},
});
},
};
// CSV to JSON transform
function createCsvToJsonTransform(): TransformStream<string, string> {
let headers: string[] = [];
let buffer = '';
let isFirstLine = true;
return new TransformStream({
transform(chunk, controller) {
buffer += chunk;
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (!line.trim()) continue;
if (isFirstLine) {
headers = line.split(',').map(h => h.trim());
isFirstLine = false;
controller.enqueue('[\n');
} else {
const values = line.split(',');
const obj: Record<string, string> = {};
headers.forEach((h, i) => {
obj[h] = values[i]?.trim() || '';
});
controller.enqueue(JSON.stringify(obj) + ',\n');
}
}
},
flush(controller) {
// Handle last line and close array
if (buffer.trim() && !isFirstLine) {
const values = buffer.split(',');
const obj: Record<string, string> = {};
headers.forEach((h, i) => {
obj[h] = values[i]?.trim() || '';
});
controller.enqueue(JSON.stringify(obj) + '\n');
}
controller.enqueue(']');
},
});
}
Server-Sent Events (SSE) Streaming
// Edge-compatible SSE stream
export async function GET(request: Request) {
const encoder = new TextEncoder();
const stream = new ReadableStream({
async start(controller) {
// Send initial connection message
controller.enqueue(encoder.encode('event: connected\ndata: {}\n\n'));
// Stream data from source
const dataSource = getDataSource(); // AsyncIterator
try {
for await (const data of dataSource) {
const sseMessage = `event: message\ndata: ${JSON.stringify(data)}\n\n`;
controller.enqueue(encoder.encode(sseMessage));
}
controller.close();
} catch (error) {
const errorMessage = `event: error\ndata: ${JSON.stringify({ error: String(error) })}\n\n`;
controller.enqueue(encoder.encode(errorMessage));
controller.close();
}
},
});
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
});
}
Streaming LLM Responses in Next.js
The most common streaming use case today: LLM responses that stream token by token.
OpenAI Streaming with Vercel AI SDK
// app/api/chat/route.ts
import { OpenAIStream, StreamingTextResponse } from 'ai';
import OpenAI from 'openai';
const openai = new OpenAI();
export async function POST(request: Request) {
const { messages } = await request.json();
const response = await openai.chat.completions.create({
model: 'gpt-4',
messages,
stream: true,
});
// Convert OpenAI stream to Web ReadableStream
const stream = OpenAIStream(response);
// Return streaming response
return new StreamingTextResponse(stream);
}
Building OpenAI Streaming from Scratch
// Understanding what OpenAIStream does under the hood
interface ChatCompletionChunk {
choices: Array<{
delta: {
content?: string;
};
}>;
}
function createOpenAIStream(
response: AsyncIterable<ChatCompletionChunk>
): ReadableStream<Uint8Array> {
const encoder = new TextEncoder();
return new ReadableStream({
async start(controller) {
try {
for await (const chunk of response) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
controller.enqueue(encoder.encode(content));
}
}
controller.close();
} catch (error) {
controller.error(error);
}
},
});
}
// Manual streaming without SDK
export async function POST(request: Request) {
const { messages } = await request.json();
const response = await fetch('https://api.openai.com/v1/chat/completions', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${process.env.OPENAI_API_KEY}`,
},
body: JSON.stringify({
model: 'gpt-4',
messages,
stream: true,
}),
});
// Transform SSE stream to text stream
const transformedStream = response.body!
.pipeThrough(new TextDecoderStream())
.pipeThrough(createSSEParser())
.pipeThrough(createContentExtractor())
.pipeThrough(new TextEncoderStream());
return new Response(transformedStream, {
headers: { 'Content-Type': 'text/plain; charset=utf-8' },
});
}
// Parse SSE format: "data: {...}\n\n"
function createSSEParser(): TransformStream<string, string> {
let buffer = '';
return new TransformStream({
transform(chunk, controller) {
buffer += chunk;
const messages = buffer.split('\n\n');
buffer = messages.pop() || '';
for (const message of messages) {
const dataLine = message
.split('\n')
.find(line => line.startsWith('data: '));
if (dataLine) {
const data = dataLine.slice(6); // Remove "data: "
if (data !== '[DONE]') {
controller.enqueue(data);
}
}
}
},
});
}
// Extract content from JSON chunks
function createContentExtractor(): TransformStream<string, string> {
return new TransformStream({
transform(chunk, controller) {
try {
const parsed = JSON.parse(chunk);
const content = parsed.choices?.[0]?.delta?.content;
if (content) {
controller.enqueue(content);
}
} catch {
// Ignore parse errors for partial chunks
}
},
});
}
Client-Side Consumption
// React component consuming streaming response
'use client';
import { useState, useCallback } from 'react';
export function ChatInterface() {
const [messages, setMessages] = useState<Message[]>([]);
const [isStreaming, setIsStreaming] = useState(false);
const sendMessage = useCallback(async (content: string) => {
// Add user message
const userMessage = { role: 'user', content };
setMessages(prev => [...prev, userMessage]);
// Add placeholder for assistant
const assistantMessage = { role: 'assistant', content: '' };
setMessages(prev => [...prev, assistantMessage]);
setIsStreaming(true);
try {
const response = await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
messages: [...messages, userMessage],
}),
});
if (!response.ok) throw new Error('Request failed');
// Stream the response
const reader = response.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value, { stream: true });
// Update the last message with new content
setMessages(prev => {
const updated = [...prev];
const lastIndex = updated.length - 1;
updated[lastIndex] = {
...updated[lastIndex],
content: updated[lastIndex].content + text,
};
return updated;
});
}
} catch (error) {
console.error('Streaming error:', error);
} finally {
setIsStreaming(false);
}
}, [messages]);
return (/* ... */);
}
Streaming with Function Calls and Tools
// Complex streaming with tool use
import { OpenAI } from 'openai';
interface StreamCallbacks {
onToken: (token: string) => void;
onToolCall: (tool: string, args: unknown) => Promise<unknown>;
onComplete: () => void;
onError: (error: Error) => void;
}
async function streamWithTools(
messages: OpenAI.ChatCompletionMessageParam[],
tools: OpenAI.ChatCompletionTool[],
callbacks: StreamCallbacks
) {
const openai = new OpenAI();
const response = await openai.chat.completions.create({
model: 'gpt-4-turbo',
messages,
tools,
stream: true,
});
let currentToolCall: {
id: string;
name: string;
arguments: string;
} | null = null;
try {
for await (const chunk of response) {
const delta = chunk.choices[0]?.delta;
// Handle text content
if (delta?.content) {
callbacks.onToken(delta.content);
}
// Handle tool calls
if (delta?.tool_calls) {
for (const toolCallDelta of delta.tool_calls) {
if (toolCallDelta.id) {
// New tool call starting
currentToolCall = {
id: toolCallDelta.id,
name: toolCallDelta.function?.name || '',
arguments: toolCallDelta.function?.arguments || '',
};
} else if (currentToolCall) {
// Continuing tool call arguments
currentToolCall.arguments += toolCallDelta.function?.arguments || '';
}
}
}
// Check if tool call is complete (finish_reason)
if (chunk.choices[0]?.finish_reason === 'tool_calls' && currentToolCall) {
const args = JSON.parse(currentToolCall.arguments);
const result = await callbacks.onToolCall(currentToolCall.name, args);
// Continue conversation with tool result
await streamWithTools(
[
...messages,
{
role: 'assistant',
tool_calls: [{
id: currentToolCall.id,
type: 'function',
function: {
name: currentToolCall.name,
arguments: currentToolCall.arguments,
},
}],
},
{
role: 'tool',
tool_call_id: currentToolCall.id,
content: JSON.stringify(result),
},
],
tools,
callbacks
);
return;
}
}
callbacks.onComplete();
} catch (error) {
callbacks.onError(error as Error);
}
}
Advanced Streaming Patterns
Pattern 1: Multiplexed Streams
// Multiple logical streams over one connection
interface MultiplexedMessage {
channel: string;
data: unknown;
}
function createMultiplexedStream(
channels: Map<string, WritableStreamDefaultWriter>
): WritableStream<MultiplexedMessage> {
return new WritableStream({
async write(message) {
const writer = channels.get(message.channel);
if (writer) {
await writer.write(message.data);
}
},
});
}
// Demultiplexer
function createDemultiplexer(): {
input: WritableStream<MultiplexedMessage>;
getChannel: (name: string) => ReadableStream;
} {
const channels = new Map<string, TransformStreamDefaultController>();
const streams = new Map<string, ReadableStream>();
const input = new WritableStream<MultiplexedMessage>({
write(message) {
let controller = channels.get(message.channel);
if (!controller) {
const transform = new TransformStream();
channels.set(message.channel, transform.writable.getWriter() as any);
streams.set(message.channel, transform.readable);
controller = channels.get(message.channel);
}
(controller as any).write(message.data);
},
});
return {
input,
getChannel(name: string): ReadableStream {
if (!streams.has(name)) {
const transform = new TransformStream();
channels.set(name, transform.writable.getWriter() as any);
streams.set(name, transform.readable);
}
return streams.get(name)!;
},
};
}
Pattern 2: Streaming with Backpressure
// Respect consumer speed
async function processWithBackpressure<T>(
source: ReadableStream<T>,
processor: (item: T) => Promise<void>,
concurrency = 1
): Promise<void> {
const reader = source.getReader();
const inFlight: Promise<void>[] = [];
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
// Add to in-flight
const promise = processor(value).then(() => {
const index = inFlight.indexOf(promise);
if (index > -1) inFlight.splice(index, 1);
});
inFlight.push(promise);
// Wait if at concurrency limit
if (inFlight.length >= concurrency) {
await Promise.race(inFlight);
}
}
// Wait for remaining
await Promise.all(inFlight);
} finally {
reader.releaseLock();
}
}
// Usage
await processWithBackpressure(
dataStream,
async (record) => {
await db.insert(record);
},
10 // Process 10 at a time
);
Pattern 3: Tee with Processing
// Split stream and process differently
async function teeAndProcess<T>(
source: ReadableStream<T>,
processors: Array<(stream: ReadableStream<T>) => Promise<void>>
): Promise<void> {
// Create tees
let current: ReadableStream<T> = source;
const streams: ReadableStream<T>[] = [];
for (let i = 0; i < processors.length - 1; i++) {
const [a, b] = current.tee();
streams.push(a);
current = b;
}
streams.push(current);
// Process in parallel
await Promise.all(
processors.map((processor, i) => processor(streams[i]))
);
}
// Example: Log, save, and analyze simultaneously
await teeAndProcess(eventStream, [
// Logger
async (stream) => {
for await (const event of streamToAsyncIterator(stream)) {
console.log('Event:', event);
}
},
// Database writer
async (stream) => {
await stream.pipeTo(createDatabaseWriter());
},
// Analytics
async (stream) => {
const stats = await computeStats(stream);
await saveStats(stats);
},
]);
// Helper: Stream to AsyncIterator
async function* streamToAsyncIterator<T>(
stream: ReadableStream<T>
): AsyncGenerator<T> {
const reader = stream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) return;
yield value;
}
} finally {
reader.releaseLock();
}
}
Pattern 4: Stream Retry and Recovery
// Resumable stream with retry
async function createResumableStream(
fetchFn: (offset: number) => Promise<ReadableStream<Uint8Array>>,
options: { maxRetries?: number; retryDelay?: number } = {}
): Promise<ReadableStream<Uint8Array>> {
const { maxRetries = 3, retryDelay = 1000 } = options;
let offset = 0;
let retries = 0;
let currentReader: ReadableStreamDefaultReader<Uint8Array> | null = null;
return new ReadableStream({
async pull(controller) {
while (true) {
try {
if (!currentReader) {
const stream = await fetchFn(offset);
currentReader = stream.getReader();
}
const { done, value } = await currentReader.read();
if (done) {
controller.close();
return;
}
offset += value.length;
retries = 0; // Reset on success
controller.enqueue(value);
return;
} catch (error) {
currentReader = null;
if (retries >= maxRetries) {
controller.error(error);
return;
}
retries++;
await new Promise(r => setTimeout(r, retryDelay * retries));
}
}
},
});
}
// Usage with Range requests
const stream = await createResumableStream(async (offset) => {
const response = await fetch(url, {
headers: offset > 0 ? { Range: `bytes=${offset}-` } : {},
});
return response.body!;
});
Performance Patterns
Chunking Strategy
┌─────────────────────────────────────────────────────────────────────────────┐
│ CHUNK SIZE TRADEOFFS │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Small chunks (e.g., 1KB) │
│ ├── ✓ Lower memory usage │
│ ├── ✓ Faster time-to-first-byte │
│ ├── ✓ More responsive backpressure │
│ ├── ✗ Higher overhead (more iterations) │
│ └── ✗ More function calls, object allocations │
│ │
│ Large chunks (e.g., 1MB) │
│ ├── ✓ Lower overhead per byte │
│ ├── ✓ Fewer iterations │
│ ├── ✗ Higher memory spikes │
│ ├── ✗ Slower time-to-first-byte │
│ └── ✗ Less responsive to consumer speed │
│ │
│ Recommended defaults: │
│ • File streaming: 64KB - 256KB │
│ • Text processing: 4KB - 16KB │
│ • LLM tokens: As they arrive (no buffering) │
│ • Database records: Batch 100-1000 records │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Memory-Efficient Transforms
// BAD: Accumulating in memory
function badTransform(): TransformStream<string, string[]> {
const allLines: string[] = [];
return new TransformStream({
transform(chunk, controller) {
allLines.push(...chunk.split('\n')); // Memory grows!
},
flush(controller) {
controller.enqueue(allLines); // All at once
},
});
}
// GOOD: Process and emit immediately
function goodTransform(): TransformStream<string, string> {
let buffer = '';
return new TransformStream({
transform(chunk, controller) {
buffer += chunk;
const lines = buffer.split('\n');
buffer = lines.pop() || ''; // Keep only incomplete
for (const line of lines) {
controller.enqueue(line); // Emit immediately
}
},
flush(controller) {
if (buffer) {
controller.enqueue(buffer);
}
},
});
}
// BEST: Use TypedArrays for binary data
function binaryTransform(): TransformStream<Uint8Array, Uint8Array> {
return new TransformStream({
transform(chunk, controller) {
// Process in place, no copying
for (let i = 0; i < chunk.length; i++) {
chunk[i] = chunk[i] ^ 0xff; // XOR transform
}
controller.enqueue(chunk); // Same buffer, no allocation
},
});
}
Async Iterator Integration
// Convert anything to streams
async function* generateData(): AsyncGenerator<string> {
for (let i = 0; i < 1000; i++) {
await new Promise(r => setTimeout(r, 10));
yield `Item ${i}\n`;
}
}
// AsyncIterator → ReadableStream
function asyncIteratorToStream<T>(
iterator: AsyncIterable<T>
): ReadableStream<T> {
return new ReadableStream({
async pull(controller) {
const iter = iterator[Symbol.asyncIterator]();
const { done, value } = await iter.next();
if (done) {
controller.close();
} else {
controller.enqueue(value);
}
},
});
}
// Better: ReadableStream.from() (Node.js 20+, some browsers)
const stream = ReadableStream.from(generateData());
Complete Example: Streaming File Processing Pipeline
// Process large CSV, transform, upload to S3
import { S3Client, CreateMultipartUploadCommand, UploadPartCommand, CompleteMultipartUploadCommand } from '@aws-sdk/client-s3';
interface ProcessingOptions {
inputUrl: string;
outputBucket: string;
outputKey: string;
transform: (record: Record<string, string>) => Record<string, string> | null;
onProgress?: (stats: { bytesRead: number; recordsProcessed: number }) => void;
}
async function processLargeFile(options: ProcessingOptions) {
const { inputUrl, outputBucket, outputKey, transform, onProgress } = options;
const s3 = new S3Client({});
// Start multipart upload
const { UploadId } = await s3.send(
new CreateMultipartUploadCommand({
Bucket: outputBucket,
Key: outputKey,
})
);
// Tracking
let bytesRead = 0;
let recordsProcessed = 0;
let partNumber = 1;
const parts: { ETag: string; PartNumber: number }[] = [];
// Chunk buffer for S3 (minimum 5MB parts)
let outputBuffer = '';
const MIN_PART_SIZE = 5 * 1024 * 1024;
// Flush buffer to S3
async function flushToS3(final = false) {
if (outputBuffer.length < MIN_PART_SIZE && !final) return;
if (outputBuffer.length === 0) return;
const { ETag } = await s3.send(
new UploadPartCommand({
Bucket: outputBucket,
Key: outputKey,
UploadId,
PartNumber: partNumber,
Body: outputBuffer,
})
);
parts.push({ ETag: ETag!, PartNumber: partNumber });
partNumber++;
outputBuffer = '';
}
// Fetch and process
const response = await fetch(inputUrl);
await response.body!
.pipeThrough(new TransformStream({
transform(chunk, controller) {
bytesRead += chunk.length;
controller.enqueue(chunk);
},
}))
.pipeThrough(new TextDecoderStream())
.pipeThrough(createCsvParser())
.pipeThrough(new TransformStream({
transform(record, controller) {
const transformed = transform(record);
if (transformed) {
recordsProcessed++;
controller.enqueue(transformed);
if (onProgress && recordsProcessed % 1000 === 0) {
onProgress({ bytesRead, recordsProcessed });
}
}
},
}))
.pipeThrough(createJsonLinesSerializer())
.pipeTo(new WritableStream({
async write(chunk) {
outputBuffer += chunk;
await flushToS3();
},
async close() {
await flushToS3(true);
},
}));
// Complete multipart upload
await s3.send(
new CompleteMultipartUploadCommand({
Bucket: outputBucket,
Key: outputKey,
UploadId,
MultipartUpload: { Parts: parts },
})
);
return { bytesRead, recordsProcessed };
}
// CSV Parser
function createCsvParser(): TransformStream<string, Record<string, string>> {
let headers: string[] = [];
let buffer = '';
let isFirstLine = true;
return new TransformStream({
transform(chunk, controller) {
buffer += chunk;
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (!line.trim()) continue;
const values = parseCSVLine(line);
if (isFirstLine) {
headers = values;
isFirstLine = false;
} else {
const record: Record<string, string> = {};
headers.forEach((h, i) => {
record[h] = values[i] || '';
});
controller.enqueue(record);
}
}
},
flush(controller) {
if (buffer.trim() && !isFirstLine) {
const values = parseCSVLine(buffer);
const record: Record<string, string> = {};
headers.forEach((h, i) => {
record[h] = values[i] || '';
});
controller.enqueue(record);
}
},
});
}
// JSON Lines serializer
function createJsonLinesSerializer(): TransformStream<Record<string, unknown>, string> {
return new TransformStream({
transform(record, controller) {
controller.enqueue(JSON.stringify(record) + '\n');
},
});
}
// CSV line parser (handles quotes)
function parseCSVLine(line: string): string[] {
const result: string[] = [];
let current = '';
let inQuotes = false;
for (const char of line) {
if (char === '"') {
inQuotes = !inQuotes;
} else if (char === ',' && !inQuotes) {
result.push(current.trim());
current = '';
} else {
current += char;
}
}
result.push(current.trim());
return result;
}
Production Checklist
┌─────────────────────────────────────────────────────────────────────────────┐
│ STREAMING PRODUCTION CHECKLIST │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Error Handling │
│ □ Streams have error handlers attached │
│ □ Pipeline errors propagate correctly │
│ □ Resources cleaned up on error (readers released, connections closed) │
│ □ Partial data handled gracefully │
│ │
│ Memory │
│ □ No unbounded buffering in transforms │
│ □ Backpressure respected (highWaterMark configured) │
│ □ Large allocations avoided in hot paths │
│ □ Streams properly closed/cancelled when done │
│ │
│ Performance │
│ □ Appropriate chunk sizes for use case │
│ □ Transforms don't block event loop │
│ □ Async operations batched where appropriate │
│ □ No unnecessary data copying │
│ │
│ Edge Runtime Compatibility │
│ □ Using Web Streams API (not Node.js streams) │
│ □ No Node.js-specific APIs │
│ □ Memory usage within limits (128MB typical) │
│ □ Execution time within limits (30s typical) │
│ │
│ Observability │
│ □ Progress reporting for long operations │
│ □ Metrics for throughput and latency │
│ □ Error tracking with context │
│ □ Request tracing across stream operations │
│ │
│ Testing │
│ □ Unit tests for transform logic │
│ □ Tests for backpressure behavior │
│ □ Tests for error scenarios │
│ □ Tests for cancellation │
│ □ Memory usage tests with large data │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
The Bottom Line
Streams change how you think about data:
-
Memory becomes constant. Process terabytes with megabytes of RAM. The chunk is the unit, not the whole.
-
Time-to-first-byte collapses. Users see results immediately. LLM responses appear token by token. Files download progressively.
-
Backpressure is automatic. Fast producers don't overwhelm slow consumers. The system self-regulates.
-
Composition is natural. Transforms chain together. Each stage is independent and testable.
-
Edge compatibility is built-in. Web Streams work everywhere: browsers, workers, Node.js, Deno.
The mental shift: stop thinking "load then process" and start thinking "process as it flows." Every fetch response is a stream. Every file is a stream. Every database cursor is a stream. The question isn't whether to use streams—it's whether you're using them intentionally.
For performance-critical features, streaming isn't an optimization. It's the architecture.
What did you think?