Backend Stream Processing Internals: Windowing, Exactly-Once Semantics, Backpressure & Stateful Operators
Backend Stream Processing Internals: Windowing, Exactly-Once Semantics, Backpressure & Stateful Operators
Why Stream Processing?
Batch processing collects data, waits, then processes it in bulk. Stream processing handles data continuously as it arrives — event by event or in micro-batches. When you need real-time dashboards, fraud detection within milliseconds, or live anomaly alerting, batch is too slow.
Batch Processing:
──────────────────
Events: ●●●●●●●●●●│●●●●●●●●●●│●●●●●●●●●●│
│ │ │
Batch 1 Batch 2 Batch 3
(1 hour) (1 hour) (1 hour)
│ │ │
Process Process Process
all at all at all at
1:00 AM 2:00 AM 3:00 AM
Latency: minutes to hours
Throughput: very high (bulk operations)
Stream Processing:
──────────────────
Events: ●→●→●→●→●→●→●→●→●→●→●→●→●→●→●→●→●→
│ │ │ │ │ │ │ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼ ▼
Process each event as it arrives
(or in micro-batches of 100ms)
Latency: milliseconds to seconds
Throughput: high (but per-event overhead)
Stream Processing Pipeline:
┌────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────┐
│ Source │──→│ Filter │──→│ Map │──→│ Aggregate│──→│ Sink │
│ (Kafka)│ │(drop bad)│ │(transform)│ │(window) │ │(DB/API)│
└────────┘ └──────────┘ └──────────┘ └──────────┘ └────────┘
Stream Processing Core Abstractions
Fundamental Concepts:
Record/Event: A single data item in the stream
{ key: "user-42", value: { action: "click", page: "/home" }, timestamp: 1709553600000 }
Stream: Unbounded, continuously appended sequence of records
──●──●──●──●──●──●──●──●──●──●──→ (infinite)
Table: A snapshot of the latest value for each key
┌──────────┬───────────────────┐
│ Key │ Value │
├──────────┼───────────────────┤
│ user-42 │ { clicks: 15 } │
│ user-99 │ { clicks: 8 } │
└──────────┴───────────────────┘
Stream ←→ Table Duality:
Stream → Table: Apply each event as an update
Table → Stream: Emit each change as an event (CDC)
Topology: A DAG of processing operators
Source → Filter → Map → GroupBy → Aggregate → Sink
Building a Stream Processing Engine from Scratch
Record and Topology Types
interface StreamRecord<K, V> {
key: K;
value: V;
timestamp: number;
offset: number; // Position in source partition
partition: number;
topic: string;
headers?: Map<string, string>;
}
interface ProcessorContext<K, V> {
forward(record: StreamRecord<K, V>): void;
commit(): Promise<void>;
schedule(intervalMs: number, callback: () => void): void;
getStateStore<S>(name: string): StateStore<S>;
currentTimestamp(): number;
taskId: string;
}
interface Processor<KIn, VIn, KOut, VOut> {
init(context: ProcessorContext<KOut, VOut>): void;
process(record: StreamRecord<KIn, VIn>): void;
close(): void;
}
interface StateStore<V> {
get(key: string): V | undefined;
put(key: string, value: V): void;
delete(key: string): void;
range(from: string, to: string): Array<[string, V]>;
all(): Array<[string, V]>;
approximateNumEntries(): number;
flush(): Promise<void>;
}
Stateful Processing Engine
class StreamProcessingEngine {
private topology: ProcessorNode[] = [];
private stateStores: Map<string, InMemoryStateStore<any>> = new Map();
private committed: Map<string, number> = new Map(); // topic:partition → offset
private running: boolean = false;
constructor(
private consumer: StreamConsumer,
private producer: StreamProducer,
private changelogTopic?: string // For state store recovery
) {}
addSource(
name: string,
topics: string[],
processor: Processor<any, any, any, any>
): StreamProcessingEngine {
this.topology.push({
name,
type: 'source',
topics,
processor,
children: []
});
return this;
}
addProcessor(
name: string,
parentName: string,
processor: Processor<any, any, any, any>,
stateStoreNames?: string[]
): StreamProcessingEngine {
const parent = this.findNode(parentName);
if (!parent) throw new Error(`Parent ${parentName} not found`);
const node: ProcessorNode = {
name,
type: 'processor',
processor,
children: [],
stateStoreNames
};
parent.children.push(node);
this.topology.push(node);
return this;
}
addSink(
name: string,
parentName: string,
outputTopic: string
): StreamProcessingEngine {
const parent = this.findNode(parentName);
if (!parent) throw new Error(`Parent ${parentName} not found`);
const sinkProcessor: Processor<any, any, any, any> = {
init: () => {},
process: (record) => {
this.producer.send(outputTopic, record);
},
close: () => {}
};
parent.children.push({
name,
type: 'sink',
processor: sinkProcessor,
children: [],
outputTopic
});
return this;
}
addStateStore(name: string): StreamProcessingEngine {
this.stateStores.set(name, new InMemoryStateStore(name));
return this;
}
async start(): Promise<void> {
this.running = true;
// Recover state from changelog topic
if (this.changelogTopic) {
await this.recoverState();
}
// Initialize all processors
for (const node of this.topology) {
const context = this.createContext(node);
node.processor.init(context);
}
// Subscribe to source topics
const allTopics = this.topology
.filter(n => n.type === 'source')
.flatMap(n => n.topics || []);
await this.consumer.subscribe(allTopics);
// Main processing loop
while (this.running) {
const records = await this.consumer.poll(100); // 100ms poll timeout
for (const record of records) {
await this.processRecord(record);
}
// Periodic commit
await this.commitOffsets();
}
}
private async processRecord(record: StreamRecord<any, any>): Promise<void> {
// Find matching source nodes
const sourceNodes = this.topology.filter(
n => n.type === 'source' && n.topics?.includes(record.topic)
);
for (const source of sourceNodes) {
this.dispatchToNode(source, record);
}
}
private dispatchToNode(node: ProcessorNode, record: StreamRecord<any, any>): void {
const context = this.createContext(node);
// Process the record
node.processor.process(record);
// Forward to children
for (const child of node.children) {
this.dispatchToNode(child, record);
}
}
private createContext(node: ProcessorNode): ProcessorContext<any, any> {
const engine = this;
return {
forward(record: StreamRecord<any, any>): void {
for (const child of node.children) {
engine.dispatchToNode(child, record);
}
},
async commit(): Promise<void> {
await engine.commitOffsets();
},
schedule(intervalMs: number, callback: () => void): void {
setInterval(callback, intervalMs);
},
getStateStore<S>(name: string): StateStore<S> {
const store = engine.stateStores.get(name);
if (!store) throw new Error(`State store ${name} not found`);
return store as StateStore<S>;
},
currentTimestamp: () => Date.now(),
taskId: node.name
};
}
private async commitOffsets(): Promise<void> {
await this.consumer.commit();
// Flush state stores to changelog
if (this.changelogTopic) {
for (const [name, store] of this.stateStores) {
await store.flush();
}
}
}
private async recoverState(): Promise<void> {
if (!this.changelogTopic) return;
// Read changelog from beginning to rebuild state
const changelogConsumer = this.consumer.createChangelogConsumer(this.changelogTopic);
let record: StreamRecord<string, any> | null;
while ((record = await changelogConsumer.readNext()) !== null) {
const storeName = record.headers?.get('state-store');
if (storeName && this.stateStores.has(storeName)) {
const store = this.stateStores.get(storeName)!;
if (record.value === null) {
store.delete(record.key);
} else {
store.put(record.key, record.value);
}
}
}
}
async stop(): Promise<void> {
this.running = false;
for (const node of this.topology) {
node.processor.close();
}
await this.commitOffsets();
}
private findNode(name: string): ProcessorNode | undefined {
return this.topology.find(n => n.name === name);
}
}
interface ProcessorNode {
name: string;
type: 'source' | 'processor' | 'sink';
topics?: string[];
outputTopic?: string;
processor: Processor<any, any, any, any>;
children: ProcessorNode[];
stateStoreNames?: string[];
}
class InMemoryStateStore<V> implements StateStore<V> {
private data: Map<string, V> = new Map();
private dirty: Map<string, V | null> = new Map(); // Changelog buffer
constructor(private name: string) {}
get(key: string): V | undefined {
return this.data.get(key);
}
put(key: string, value: V): void {
this.data.set(key, value);
this.dirty.set(key, value);
}
delete(key: string): void {
this.data.delete(key);
this.dirty.set(key, null); // Tombstone
}
range(from: string, to: string): Array<[string, V]> {
const results: Array<[string, V]> = [];
for (const [key, value] of this.data) {
if (key >= from && key <= to) {
results.push([key, value]);
}
}
return results.sort((a, b) => a[0].localeCompare(b[0]));
}
all(): Array<[string, V]> {
return Array.from(this.data.entries());
}
approximateNumEntries(): number {
return this.data.size;
}
async flush(): Promise<void> {
// In production: write dirty entries to changelog topic
this.dirty.clear();
}
}
interface StreamConsumer {
subscribe(topics: string[]): Promise<void>;
poll(timeoutMs: number): Promise<StreamRecord<any, any>[]>;
commit(): Promise<void>;
createChangelogConsumer(topic: string): ChangelogConsumer;
}
interface StreamProducer {
send(topic: string, record: StreamRecord<any, any>): Promise<void>;
}
interface ChangelogConsumer {
readNext(): Promise<StreamRecord<string, any> | null>;
}
Windowing: Aggregating Bounded Slices of Unbounded Streams
Window Types:
Tumbling Window (fixed, non-overlapping):
──────┬──────┬──────┬──────┬──────→ time
│ W1 │ W2 │ W3 │ W4 │
│ 0-5m │ 5-10m│10-15m│15-20m│
Each event belongs to exactly ONE window.
Use: metrics per minute, hourly aggregations
Hopping/Sliding Window (fixed, overlapping):
──────┬────────────┬──────→ time
│ W1 (10min) │
│ ┌──┤ │
│ │ W2 (10min) │
│ │ ┌──┤ │
│ │ │ W3 (10min)│
advance=5min, size=10min
Events can belong to MULTIPLE windows.
Use: moving averages, rolling counts
Session Window (dynamic, gap-based):
──●●●──gap──●●──gap──●●●●●●──gap──●●→
│ S1 │ │S2│ │ S3 │ │S4│
Window closes when no events arrive for gap duration.
Use: user sessions, activity bursts
Global Window:
──────────────────────────────────→ time
│ Everything │
Single window for all time.
Use with triggers: emit every N events or every N seconds.
Window Implementation
interface Window {
start: number;
end: number;
}
interface WindowAssigner {
assignWindows(timestamp: number): Window[];
windowSize: number;
}
class TumblingWindowAssigner implements WindowAssigner {
constructor(public windowSize: number) {}
assignWindows(timestamp: number): Window[] {
const start = Math.floor(timestamp / this.windowSize) * this.windowSize;
return [{ start, end: start + this.windowSize }];
}
}
class HoppingWindowAssigner implements WindowAssigner {
constructor(
public windowSize: number,
private advance: number
) {}
assignWindows(timestamp: number): Window[] {
const windows: Window[] = [];
// Find all windows this timestamp falls into
const firstWindowStart = Math.floor(timestamp / this.advance) * this.advance - this.windowSize + this.advance;
for (let start = firstWindowStart; start <= timestamp; start += this.advance) {
if (start + this.windowSize > timestamp) {
windows.push({ start, end: start + this.windowSize });
}
}
return windows;
}
}
class SessionWindowAssigner implements WindowAssigner {
windowSize: number; // Actually the gap duration
constructor(private gapMs: number) {
this.windowSize = gapMs;
}
// Session windows are assigned dynamically — they merge
assignWindows(timestamp: number): Window[] {
return [{ start: timestamp, end: timestamp + this.gapMs }];
}
}
type AggregatorFn<V, A> = {
initialValue: () => A;
add: (accumulator: A, value: V) => A;
remove?: (accumulator: A, value: V) => A; // For hopping windows
merge?: (a: A, b: A) => A; // For session window merging
};
class WindowedAggregator<K, V, A> {
private windowStore: Map<string, A> = new Map(); // "key:windowStart" → aggregate
private windowTimers: Map<string, NodeJS.Timeout> = new Map();
constructor(
private assigner: WindowAssigner,
private aggregator: AggregatorFn<V, A>,
private watermarkTracker: WatermarkTracker,
private onWindowClose: (key: K, window: Window, result: A) => void,
private allowedLateness: number = 0
) {}
process(key: K, value: V, timestamp: number): void {
const windows = this.assigner.assignWindows(timestamp);
for (const window of windows) {
const storeKey = `${String(key)}:${window.start}`;
// Check if window is still open (considering watermark + lateness)
const watermark = this.watermarkTracker.currentWatermark();
if (window.end + this.allowedLateness < watermark) {
// Late event beyond allowed lateness — drop or send to side output
continue;
}
// Get or initialize aggregate
let aggregate = this.windowStore.get(storeKey);
if (aggregate === undefined) {
aggregate = this.aggregator.initialValue();
this.scheduleWindowClose(key, window, storeKey);
}
// Apply aggregation
aggregate = this.aggregator.add(aggregate, value);
this.windowStore.set(storeKey, aggregate);
}
// Advance watermark
this.watermarkTracker.update(timestamp);
}
private scheduleWindowClose(key: K, window: Window, storeKey: string): void {
// Window closes when watermark passes window.end + lateness
const checkClose = () => {
const watermark = this.watermarkTracker.currentWatermark();
if (watermark >= window.end + this.allowedLateness) {
const result = this.windowStore.get(storeKey);
if (result !== undefined) {
this.onWindowClose(key, window, result);
this.windowStore.delete(storeKey);
this.windowTimers.delete(storeKey);
}
} else {
// Check again later
const timer = setTimeout(checkClose, 1000);
this.windowTimers.set(storeKey, timer);
}
};
const timer = setTimeout(checkClose, this.assigner.windowSize);
this.windowTimers.set(storeKey, timer);
}
// Session window merging: merge overlapping sessions
mergeSessionWindows(key: K): void {
if (!(this.assigner instanceof SessionWindowAssigner)) return;
const prefix = `${String(key)}:`;
const sessions: Array<{ start: number; end: number; storeKey: string; value: A }> = [];
for (const [storeKey, value] of this.windowStore) {
if (storeKey.startsWith(prefix)) {
const start = parseInt(storeKey.slice(prefix.length));
sessions.push({
start,
end: start + this.assigner.windowSize,
storeKey,
value
});
}
}
// Sort by start time
sessions.sort((a, b) => a.start - b.start);
// Merge overlapping sessions
for (let i = 0; i < sessions.length - 1; i++) {
if (sessions[i].end >= sessions[i + 1].start) {
// Merge session i+1 into session i
sessions[i].end = Math.max(sessions[i].end, sessions[i + 1].end);
if (this.aggregator.merge) {
sessions[i].value = this.aggregator.merge(sessions[i].value, sessions[i + 1].value);
}
// Remove session i+1
this.windowStore.delete(sessions[i + 1].storeKey);
sessions.splice(i + 1, 1);
i--; // Recheck current position
}
}
}
}
Watermarks & Event-Time Processing
The Problem: Events Arrive Out of Order
Event time: 10:01 10:03 10:02 10:05 10:04 10:06
Arrival time: 10:01 10:02 10:02 10:03 10:06 10:06
↑
10:04 arrived late!
If we close the [10:00-10:05) window at arrival time 10:05,
we miss the event with event time 10:04.
Watermark: "I believe no events with timestamp < W will arrive"
Events: ●(10:01) ●(10:03) ●(10:02) ●(10:05) ●(10:04) ●(10:06)
↑
Watermark: ───10:00────10:01────10:01────10:03────10:03────10:04──→
Watermark advances conservatively.
Window [10:00-10:05) closes when watermark reaches 10:05.
Late events (after watermark) either:
- Dropped (simple)
- Processed with allowed lateness (window stays open longer)
- Sent to side output (dead letter)
Watermark Implementation
class WatermarkTracker {
private watermark: number = -Infinity;
private maxOutOfOrderness: number; // Expected max lateness
private lastEventTime: number = 0;
constructor(maxOutOfOrdernessMs: number = 5000) {
this.maxOutOfOrderness = maxOutOfOrdernessMs;
}
update(eventTimestamp: number): void {
this.lastEventTime = Math.max(this.lastEventTime, eventTimestamp);
// Watermark = max event time seen - max out-of-orderness
const proposedWatermark = this.lastEventTime - this.maxOutOfOrderness;
// Watermark only advances (never goes backward)
this.watermark = Math.max(this.watermark, proposedWatermark);
}
currentWatermark(): number {
return this.watermark;
}
// Periodic watermark advancement when no events arrive
advanceOnIdle(currentProcessingTime: number): void {
// If no events for 2x max out-of-orderness, advance watermark
// This prevents windows from staying open forever during idle periods
const idleThreshold = this.maxOutOfOrderness * 2;
if (currentProcessingTime - this.lastEventTime > idleThreshold) {
this.watermark = currentProcessingTime - this.maxOutOfOrderness;
}
}
}
// Per-partition watermark tracking
class PartitionedWatermarkTracker {
private partitionWatermarks: Map<number, number> = new Map();
updatePartition(partition: number, eventTimestamp: number, maxLag: number): void {
const current = this.partitionWatermarks.get(partition) || -Infinity;
const proposed = eventTimestamp - maxLag;
this.partitionWatermarks.set(partition, Math.max(current, proposed));
}
// Global watermark = minimum of all partition watermarks
globalWatermark(): number {
if (this.partitionWatermarks.size === 0) return -Infinity;
return Math.min(...this.partitionWatermarks.values());
}
}
Exactly-Once Semantics
Delivery Guarantees:
At-Most-Once:
Producer → Broker → Consumer
Fire and forget. May lose messages.
Consumer processes, does NOT commit offset → crash → skip message
At-Least-Once:
Producer → Broker → Consumer
Producer retries on failure. Consumer commits AFTER processing.
If consumer crashes after processing but before commit → reprocess
Process → [crash] → Restart → Process AGAIN → Commit
Side effects happen TWICE
Exactly-Once:
Producer → Broker → Consumer
Each message is processed exactly once, even with failures.
Requires: idempotent producers + transactional consumers
Kafka's approach:
1. Idempotent producer: broker deduplicates by producer ID + sequence #
2. Transactions: atomically write output + commit input offset
3. Consumer reads only COMMITTED messages (isolation.level=read_committed)
Exactly-Once Processing Implementation
interface TransactionalProducer {
initTransactions(): Promise<void>;
beginTransaction(): void;
send(topic: string, record: StreamRecord<any, any>): Promise<void>;
sendOffsetsToTransaction(offsets: Map<string, number>): void;
commitTransaction(): Promise<void>;
abortTransaction(): Promise<void>;
}
class ExactlyOnceProcessor<KIn, VIn, KOut, VOut> {
private txProducer: TransactionalProducer;
private consumer: StreamConsumer;
private processor: Processor<KIn, VIn, KOut, VOut>;
private outputBuffer: StreamRecord<KOut, VOut>[] = [];
private running: boolean = false;
constructor(
txProducer: TransactionalProducer,
consumer: StreamConsumer,
processor: Processor<KIn, VIn, KOut, VOut>,
private outputTopic: string,
private commitIntervalMs: number = 200
) {
this.txProducer = txProducer;
this.consumer = consumer;
this.processor = processor;
}
async start(): Promise<void> {
this.running = true;
await this.txProducer.initTransactions();
const context = this.createContext();
this.processor.init(context);
let lastCommit = Date.now();
while (this.running) {
const records = await this.consumer.poll(100);
for (const record of records) {
this.processor.process(record as StreamRecord<KIn, VIn>);
}
// Commit periodically (not per-record for performance)
if (Date.now() - lastCommit >= this.commitIntervalMs && this.outputBuffer.length > 0) {
await this.commitTransaction(records);
lastCommit = Date.now();
}
}
}
private async commitTransaction(inputRecords: StreamRecord<any, any>[]): Promise<void> {
try {
// Begin atomic transaction
this.txProducer.beginTransaction();
// Step 1: Send all output records within transaction
for (const record of this.outputBuffer) {
await this.txProducer.send(this.outputTopic, record);
}
// Step 2: Commit input offsets within SAME transaction
const offsets = new Map<string, number>();
for (const record of inputRecords) {
const key = `${record.topic}:${record.partition}`;
const current = offsets.get(key) || -1;
offsets.set(key, Math.max(current, record.offset + 1));
}
this.txProducer.sendOffsetsToTransaction(offsets);
// Step 3: Commit transaction atomically
// Output records + offset commits are ALL committed together
await this.txProducer.commitTransaction();
this.outputBuffer = [];
} catch (error) {
// Abort transaction — nothing was committed
await this.txProducer.abortTransaction();
this.outputBuffer = [];
throw error;
}
}
private createContext(): ProcessorContext<KOut, VOut> {
return {
forward: (record: StreamRecord<KOut, VOut>) => {
this.outputBuffer.push(record);
},
commit: async () => {}, // Handled by commitTransaction
schedule: (interval, callback) => setInterval(callback, interval),
getStateStore: (name) => { throw new Error('State stores not yet supported'); },
currentTimestamp: () => Date.now(),
taskId: 'eo-processor'
};
}
async stop(): Promise<void> {
this.running = false;
this.processor.close();
}
}
Backpressure: Controlling Flow When Consumer Is Slower Than Producer
The Problem:
Producer: 100K events/sec ═══════════╗
║
Consumer: 10K events/sec ◄═══════════╝
║
Buffer grows unbounded
→ OOM crash
→ Message loss
Backpressure Strategies:
1. Drop (Lossy):
Producer ══●●●●●●●●══╗
║ buffer full
Drop new events → ●●●╳╳╳╳╳
Consumer ◄════●●●═════╝
Use: Metrics, monitoring (sampling OK)
2. Buffer + Spill to Disk:
Producer ══●●●●●●●●══╗
║ memory buffer full
Spill → [disk buffer]║
Consumer ◄════●●●═════╝ ← reads from memory, then disk
Use: Log processing, batch jobs
3. Rate Limiting (Push-Back):
Producer ══●●──SLOW DOWN──●●══╗
║ consumer signals "slow down"
Consumer ◄════●●═══════════════╝
Use: TCP flow control, Reactive Streams
4. Dynamic Scaling:
Producer ══●●●●●●●●══╗
║ consumer lag increasing
Auto-scale consumers: ║
Consumer 1 ◄════●●════╝
Consumer 2 ◄════●●════╝ (new)
Consumer 3 ◄════●●════╝ (new)
Use: Kafka consumer groups, Kubernetes HPA
Backpressure Implementation
class BackpressureController {
private bufferSize: number = 0;
private highWatermark: number;
private lowWatermark: number;
private paused: boolean = false;
// Callbacks
private onPause?: () => void;
private onResume?: () => void;
private onDrop?: (record: any) => void;
constructor(options: {
highWatermark: number; // Pause when buffer exceeds this
lowWatermark: number; // Resume when buffer drops below this
strategy: 'pause' | 'drop' | 'buffer';
onPause?: () => void;
onResume?: () => void;
onDrop?: (record: any) => void;
}) {
this.highWatermark = options.highWatermark;
this.lowWatermark = options.lowWatermark;
this.onPause = options.onPause;
this.onResume = options.onResume;
this.onDrop = options.onDrop;
}
recordReceived(size: number = 1): boolean {
this.bufferSize += size;
if (this.bufferSize >= this.highWatermark && !this.paused) {
this.paused = true;
this.onPause?.();
return false; // Signal: stop producing
}
return true; // OK to continue
}
recordProcessed(size: number = 1): void {
this.bufferSize = Math.max(0, this.bufferSize - size);
if (this.paused && this.bufferSize <= this.lowWatermark) {
this.paused = false;
this.onResume?.();
}
}
isPaused(): boolean {
return this.paused;
}
currentLoad(): number {
return this.bufferSize / this.highWatermark;
}
}
// Reactive Streams-style pull-based backpressure
class PullBasedStream<T> {
private buffer: T[] = [];
private demand: number = 0; // Outstanding demand from subscriber
private subscriber?: StreamSubscriber<T>;
subscribe(subscriber: StreamSubscriber<T>): void {
this.subscriber = subscriber;
subscriber.onSubscribe({
request: (n: number) => {
this.demand += n;
this.drain();
},
cancel: () => {
this.subscriber = undefined;
}
});
}
emit(item: T): void {
this.buffer.push(item);
this.drain();
}
private drain(): void {
while (this.demand > 0 && this.buffer.length > 0 && this.subscriber) {
const item = this.buffer.shift()!;
this.demand--;
this.subscriber.onNext(item);
}
}
}
interface StreamSubscriber<T> {
onSubscribe(subscription: StreamSubscription): void;
onNext(item: T): void;
onError(error: Error): void;
onComplete(): void;
}
interface StreamSubscription {
request(n: number): void;
cancel(): void;
}
// Consumer lag monitoring for auto-scaling decisions
class ConsumerLagMonitor {
private lagSamples: Map<string, number[]> = new Map();
private readonly maxSamples = 60; // 1 minute of samples (1/sec)
recordLag(consumerGroup: string, partition: number, lag: number): void {
const key = `${consumerGroup}:${partition}`;
if (!this.lagSamples.has(key)) {
this.lagSamples.set(key, []);
}
const samples = this.lagSamples.get(key)!;
samples.push(lag);
if (samples.length > this.maxSamples) {
samples.shift();
}
}
shouldScaleUp(consumerGroup: string): ScaleDecision {
let totalLag = 0;
let increasing = 0;
let partitionCount = 0;
for (const [key, samples] of this.lagSamples) {
if (!key.startsWith(consumerGroup + ':')) continue;
partitionCount++;
totalLag += samples[samples.length - 1] || 0;
// Check if lag is increasing (linear regression slope > 0)
if (samples.length >= 10) {
const recentAvg = average(samples.slice(-5));
const olderAvg = average(samples.slice(-10, -5));
if (recentAvg > olderAvg * 1.2) increasing++;
}
}
return {
shouldScale: increasing > partitionCount * 0.5, // >50% partitions growing
totalLag,
partitionCount,
increasingPartitions: increasing,
suggestedConsumers: Math.min(
partitionCount, // Can't have more consumers than partitions
Math.ceil(totalLag / 10000) // 1 consumer per 10K lag
)
};
}
}
interface ScaleDecision {
shouldScale: boolean;
totalLag: number;
partitionCount: number;
increasingPartitions: number;
suggestedConsumers: number;
}
function average(nums: number[]): number {
return nums.reduce((sum, n) => sum + n, 0) / nums.length;
}
Common Stream Processing Operators
// Filter: drop records that don't match predicate
class FilterProcessor<K, V> implements Processor<K, V, K, V> {
private ctx!: ProcessorContext<K, V>;
constructor(private predicate: (key: K, value: V) => boolean) {}
init(context: ProcessorContext<K, V>): void { this.ctx = context; }
process(record: StreamRecord<K, V>): void {
if (this.predicate(record.key, record.value)) {
this.ctx.forward(record);
}
}
close(): void {}
}
// Map: transform each record
class MapProcessor<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut> {
private ctx!: ProcessorContext<KOut, VOut>;
constructor(private mapper: (key: KIn, value: VIn) => { key: KOut; value: VOut }) {}
init(context: ProcessorContext<KOut, VOut>): void { this.ctx = context; }
process(record: StreamRecord<KIn, VIn>): void {
const { key, value } = this.mapper(record.key, record.value);
this.ctx.forward({ ...record, key, value } as any);
}
close(): void {}
}
// FlatMap: one input → zero or more outputs
class FlatMapProcessor<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut> {
private ctx!: ProcessorContext<KOut, VOut>;
constructor(private mapper: (key: KIn, value: VIn) => Array<{ key: KOut; value: VOut }>) {}
init(context: ProcessorContext<KOut, VOut>): void { this.ctx = context; }
process(record: StreamRecord<KIn, VIn>): void {
const results = this.mapper(record.key, record.value);
for (const { key, value } of results) {
this.ctx.forward({ ...record, key, value } as any);
}
}
close(): void {}
}
// Stateful aggregate with state store
class AggregateProcessor<K, V, A> implements Processor<K, V, K, A> {
private ctx!: ProcessorContext<K, A>;
private store!: StateStore<A>;
constructor(
private storeName: string,
private initializer: () => A,
private aggregator: (key: K, value: V, aggregate: A) => A
) {}
init(context: ProcessorContext<K, A>): void {
this.ctx = context;
this.store = context.getStateStore(this.storeName);
}
process(record: StreamRecord<K, V>): void {
const keyStr = String(record.key);
let current = this.store.get(keyStr);
if (current === undefined) {
current = this.initializer();
}
const updated = this.aggregator(record.key, record.value, current);
this.store.put(keyStr, updated);
this.ctx.forward({ ...record, value: updated } as any);
}
close(): void {}
}
// Join: combine two streams by key within a time window
class StreamStreamJoin<K, V1, V2, VOut> {
private buffer1: Map<string, Array<{ value: V1; timestamp: number }>> = new Map();
private buffer2: Map<string, Array<{ value: V2; timestamp: number }>> = new Map();
constructor(
private joinWindow: number, // ms
private joiner: (key: K, left: V1, right: V2) => VOut,
private onResult: (key: K, result: VOut, timestamp: number) => void
) {}
processLeft(key: K, value: V1, timestamp: number): void {
const keyStr = String(key);
// Store in left buffer
if (!this.buffer1.has(keyStr)) this.buffer1.set(keyStr, []);
this.buffer1.get(keyStr)!.push({ value, timestamp });
// Check right buffer for matching records within window
const rightRecords = this.buffer2.get(keyStr) || [];
for (const right of rightRecords) {
if (Math.abs(right.timestamp - timestamp) <= this.joinWindow) {
const result = this.joiner(key, value, right.value);
this.onResult(key, result, Math.max(timestamp, right.timestamp));
}
}
this.cleanExpired(this.buffer1, timestamp);
}
processRight(key: K, value: V2, timestamp: number): void {
const keyStr = String(key);
if (!this.buffer2.has(keyStr)) this.buffer2.set(keyStr, []);
this.buffer2.get(keyStr)!.push({ value, timestamp });
const leftRecords = this.buffer1.get(keyStr) || [];
for (const left of leftRecords) {
if (Math.abs(left.timestamp - timestamp) <= this.joinWindow) {
const result = this.joiner(key, left.value, value);
this.onResult(key, result, Math.max(timestamp, left.timestamp));
}
}
this.cleanExpired(this.buffer2, timestamp);
}
private cleanExpired(buffer: Map<string, Array<{ timestamp: number }>>, currentTime: number): void {
for (const [key, records] of buffer) {
const filtered = records.filter(r => currentTime - r.timestamp <= this.joinWindow * 2);
if (filtered.length === 0) {
buffer.delete(key);
} else {
buffer.set(key, filtered as any);
}
}
}
}
Comparing Stream Processing Frameworks
┌──────────────┬───────────────┬───────────────┬───────────────┬───────────────┐
│ Feature │ Kafka Streams │ Apache Flink │ Spark Streaming│ Pulsar Func. │
├──────────────┼───────────────┼───────────────┼───────────────┼───────────────┤
│ Model │ Event-at-a- │ Event-at-a- │ Micro-batch │ Event/Function│
│ │ time │ time │ (100ms+) │ │
│ Deployment │ Library (JVM) │ Cluster │ Cluster │ Serverless / │
│ │ │ (standalone) │ (YARN/K8s) │ Library │
│ State │ RocksDB local │ RocksDB + │ In-memory + │ BookKeeper │
│ │ + changelog │ checkpoints │ checkpoints │ │
│ Exactly-Once │ Transactions │ Checkpoints + │ WAL + check- │ Effectively │
│ │ │ 2PC sinks │ points │ once │
│ Latency │ ~1ms │ ~1ms │ ~100ms-1s │ ~5ms │
│ Throughput │ ~500K/s │ Millions/s │ Millions/s │ ~100K/s │
│ Windowing │ Time, Session │ Time, Session,│ Time only │ Basic │
│ │ │ Count, Custom │ │ │
│ Join Support │ KStream- │ Full (stream- │ Stream-stream │ Limited │
│ │ KTable, etc. │ stream, table)│ │ │
│ Scaling │ Kafka consumer│ TaskManager │ Executors │ Instances │
│ │ groups │ parallelism │ parallelism │ │
│ Fault Tol. │ Consumer │ Checkpoints + │ RDD lineage │ Auto-restart │
│ │ rebalance │ savepoints │ │ │
│ Best For │ Kafka-centric │ Complex event │ ML pipelines, │ Simple event │
│ │ microservices │ processing │ batch + stream│ processing │
└──────────────┴───────────────┴───────────────┴───────────────┴───────────────┘
Interview Questions
Q1: Explain the difference between event-time and processing-time in stream processing. Why does it matter?
Processing time is when the event reaches the processing engine — Date.now() on the server. Event time is when the event actually occurred — the timestamp embedded in the event payload. The distinction matters because events arrive out of order. A mobile app might batch events and send them 30 seconds late. Network delays, retries, and partition rebalancing all cause ordering disruption. If you window by processing time, events with event-time 10:01 might land in the 10:02 window because they arrived late. This produces incorrect aggregations. Event-time processing uses watermarks to track progress — the watermark represents the engine's belief that all events before time W have arrived. Windows close based on watermark advancement, not wall-clock time. The trade-off: event-time processing is more complex (requires watermark tracking, late event handling, allowed lateness configuration) but produces correct results. Processing-time is simpler and lower latency but produces approximate results.
Q2: How does Kafka achieve exactly-once semantics in stream processing?
Kafka's exactly-once has three components working together. (1) Idempotent producer: Each producer gets a unique PID (producer ID) from the broker. Every message carries a sequence number. The broker deduplicates by (PID, partition, sequence) — if it sees a duplicate sequence number, it discards the message but returns success. This prevents double-writes due to producer retries. (2) Transactions: A producer can start a transaction that atomically writes output messages AND commits consumer offsets. The broker maintains a transaction coordinator that tracks the transaction state. On commit, all writes become visible atomically. On abort, none are visible. (3) Read committed consumers: Consumers configured with isolation.level=read_committed only see messages from committed transactions. Aborted transaction messages are invisible. The combination means: consumer reads input → processes → producer writes output + commits input offset atomically. If the processor crashes, the transaction is either committed (all visible) or not (nothing visible) — no partial state.
Q3: What is backpressure in stream processing, and how do you handle it?
Backpressure occurs when a downstream component processes slower than the upstream produces. Without control, internal buffers grow until OOM. Four strategies: (1) Drop: Discard excess events (lossy). Suitable for metrics where sampling is acceptable. (2) Buffer with bounds: Accept events into a bounded buffer. When full, block the producer or spill to disk. This absorbs temporary bursts but OOMs on sustained overload. (3) Rate limiting / push-back: The consumer signals the producer to slow down. TCP does this natively with window sizes. Reactive Streams (Java/Kotlin) use request(n) where the subscriber tells the publisher how many items it can accept. Flink propagates backpressure through the operator chain via network buffers — when a downstream buffer is full, the upstream operator blocks on emit(). (4) Auto-scaling: Monitor consumer lag and add more consumer instances. Kafka consumer groups rebalance partitions across new consumers automatically. This is the only strategy that actually increases throughput rather than just controlling flow.
Q4: How do windowed aggregations work, and what happens with late-arriving events?
Windowed aggregations group events into time-bounded buckets and compute aggregates per bucket. Tumbling windows are fixed, non-overlapping intervals (e.g., every 5 minutes). Hopping windows overlap (10-minute window advancing every 5 minutes). Session windows are dynamic — they close after a gap of inactivity. When a window closes, it emits its aggregate result. Windows close based on watermark advancement — when the watermark passes the window's end time, the window fires. Late events (arriving after watermark but within the allowed lateness) trigger an updated result for the already-fired window — this updates downstream. Events arriving beyond the allowed lateness are dropped or sent to a side output. The allowed lateness creates a trade-off: larger lateness means better completeness but higher memory (more open windows) and higher output latency. In practice, set allowed lateness based on your domain — real-time dashboards might tolerate 0 lateness, billing aggregations might allow 1 hour.
Q5: Explain the stream-table duality. How is it used in practice?
A stream is an unbounded sequence of immutable events. A table is a mutable snapshot of the latest value per key. They're dual representations of the same data: (1) Stream → Table: Apply each stream event as an update to the table. INSERT key=A, value=1 then UPDATE key=A, value=2 gives a table with A=2. This is called "materializing" a stream. (2) Table → Stream: Emit every change to the table as a stream event. This is Change Data Capture (CDC). In Kafka Streams, a KTable is a compacted topic where each key retains only the latest value. When you join a KStream (events) with a KTable (lookup), the KTable acts like a dimension table that's continuously updated. Example: join a stream of order events with a customer KTable to enrich orders with customer details. The KTable is backed by a local RocksDB store for fast lookups. As the customer topic receives updates, the local store is updated, and subsequent joins use the latest customer data. This pattern replaces traditional batch ETL joins with real-time enrichment.
Real-World Problems & How to Solve Them
Problem 1: Window counts are wrong when events arrive late
Symptom: Hourly dashboards show spikes and dips that disappear after backfills.
Root cause: Windows are keyed by processing time instead of event time; out-of-order events are assigned to the wrong bucket.
Fix — extract event-time and drive watermark from observed timestamps:
interface RawClick {
userId: string;
occurredAtIso: string;
page: string;
}
function toRecord(
raw: RawClick,
offset: number,
partition: number
): StreamRecord<string, RawClick> {
return {
key: raw.userId,
value: raw,
timestamp: Date.parse(raw.occurredAtIso),
offset,
partition,
topic: 'clicks',
};
}
class WatermarkTracker {
private maxSeenTs = 0;
constructor(private readonly maxOutOfOrderMs: number) {}
update(eventTs: number): number {
this.maxSeenTs = Math.max(this.maxSeenTs, eventTs);
return this.maxSeenTs - this.maxOutOfOrderMs;
}
}
Problem 2: Late events are silently dropped
Symptom: Audit totals don’t match because delayed mobile events never appear in aggregates.
Root cause: Allowed lateness is zero, so post-watermark events are discarded.
Fix — keep a bounded lateness window and route too-late records to a dead-letter stream:
function routeLateEvent(
record: StreamRecord<string, any>,
windowEndMs: number,
watermarkMs: number,
allowedLatenessMs: number
): 'on-time' | 'late-update' | 'drop' {
if (record.timestamp <= watermarkMs - allowedLatenessMs) return 'drop';
if (record.timestamp <= windowEndMs && record.timestamp < watermarkMs) return 'late-update';
return 'on-time';
}
// usage
const route = routeLateEvent(record, windowEnd, currentWatermark, 5 * 60_000);
if (route === 'drop') deadLetterProducer.send('events.too-late', record);
Problem 3: Duplicates appear after consumer restarts
Symptom: Revenue counters jump whenever the stream app crashes and recovers.
Root cause: Offset commits happen separately from sink writes, creating at-least-once effects.
Fix — use idempotent sink writes with deterministic event IDs:
interface AggregatedRow {
eventId: string;
key: string;
windowStart: number;
count: number;
}
async function writeIdempotent(
db: { upsertByEventId: (row: AggregatedRow) => Promise<void> },
row: AggregatedRow
): Promise<void> {
await db.upsertByEventId(row);
}
function makeEventId(record: StreamRecord<string, any>): string {
return `${record.topic}:${record.partition}:${record.offset}`;
}
Problem 4: Stateful operators run out of memory
Symptom: JVM/Node memory climbs indefinitely in long-running jobs.
Root cause: Closed windows are never evicted; state store retains expired keys forever.
Fix — periodically purge state by watermark:
interface WindowAgg {
windowEnd: number;
count: number;
}
function evictClosedWindows(
store: StateStore<WindowAgg>,
watermarkMs: number,
allowedLatenessMs: number
): void {
for (const [key, agg] of store.all()) {
if (agg.windowEnd + allowedLatenessMs <= watermarkMs) {
store.delete(key);
}
}
}
Problem 5: Pipeline stalls under burst traffic
Symptom: Source lag grows rapidly and CPU usage drops in downstream operators.
Root cause: Unbounded in-memory queues absorb bursts until GC thrashes; no backpressure contract.
Fix — add bounded queues and await drain before reading more:
class BoundedQueue<T> {
private items: T[] = [];
constructor(private readonly maxSize: number) {}
async push(item: T): Promise<void> {
while (this.items.length >= this.maxSize) {
await new Promise((r) => setTimeout(r, 5));
}
this.items.push(item);
}
pop(): T | undefined {
return this.items.shift();
}
}
Problem 6: Wrong results after failover recovery
Symptom: After restart, the app replays from the right offset but reconstructed state is stale.
Root cause: Offsets are committed before flushing local state/changelog, breaking checkpoint atomicity.
Fix — flush state first, then commit offsets in one checkpoint routine:
async function commitCheckpoint(
stores: Array<StateStore<unknown>>,
commitOffsets: () => Promise<void>
): Promise<void> {
for (const store of stores) {
await store.flush();
}
await commitOffsets();
}
Problem 7: One partition lags while others are idle
Symptom: End-to-end latency spikes for a few keys, even though cluster utilization looks low.
Root cause: Hot keys map to a single partition and overload one task.
Fix — shard hot keys with stable salting before partitioning:
function shardKey(key: string, shards: number): string {
let hash = 0;
for (let i = 0; i < key.length; i++) {
hash = (hash * 31 + key.charCodeAt(i)) >>> 0;
}
const shard = hash % shards;
return `${key}#${shard}`;
}
// producer key for partitioning
const partitionKey = shardKey(record.key, 8);
Key Takeaways
-
Stream processing handles unbounded data in real-time: Unlike batch processing, stream engines process events as they arrive with millisecond latency.
-
Event time ≠ processing time: Always use event-time for correct aggregations. Watermarks track progress and trigger window closures.
-
Exactly-once requires coordination across the pipeline: Kafka achieves it with idempotent producers + transactions + read-committed consumers. Flink uses distributed snapshots (Chandy-Lamport algorithm).
-
Windowing is the core abstraction for bounded aggregations: Tumbling (fixed, non-overlapping), hopping (fixed, overlapping), and session (dynamic, gap-based) cover most use cases.
-
Watermarks balance completeness vs latency: Conservative watermarks wait longer for late events (more complete, higher latency). Aggressive watermarks fire sooner (lower latency, more drops).
-
Backpressure prevents OOM in unbalanced pipelines: Push-back (Reactive Streams), bounded buffers, and auto-scaling are the primary strategies. Flink and Kafka Streams handle this automatically.
-
State is the hardest part of stream processing: Stateful operators (aggregations, joins) need fault-tolerant state stores. Kafka Streams uses RocksDB + changelog topics. Flink uses checkpoints to S3/HDFS.
-
Stream-table duality enables real-time enrichment: Materialize a stream into a table for lookups. Emit table changes as a stream for downstream. This replaces batch ETL joins.
-
Kafka Streams is a library, not a cluster: It runs inside your application as a dependency — no separate infrastructure. Scaling = adding more application instances.
-
Choose the right tool for complexity: Simple event routing → Pulsar Functions. Kafka-centric apps → Kafka Streams. Complex event processing with custom windowing → Flink. ML + batch + stream → Spark Structured Streaming.
What did you think?