Back to Blog

Backend Stream Processing Internals: Windowing, Exactly-Once Semantics, Backpressure & Stateful Operators

March 23, 2026132 min read0 views

Backend Stream Processing Internals: Windowing, Exactly-Once Semantics, Backpressure & Stateful Operators

Why Stream Processing?

Batch processing collects data, waits, then processes it in bulk. Stream processing handles data continuously as it arrives — event by event or in micro-batches. When you need real-time dashboards, fraud detection within milliseconds, or live anomaly alerting, batch is too slow.

Batch Processing:
──────────────────
Events: ●●●●●●●●●●│●●●●●●●●●●│●●●●●●●●●●│
                   │          │          │
              Batch 1     Batch 2     Batch 3
              (1 hour)    (1 hour)    (1 hour)
                   │          │          │
              Process     Process     Process
              all at      all at      all at
              1:00 AM     2:00 AM     3:00 AM

Latency: minutes to hours
Throughput: very high (bulk operations)


Stream Processing:
──────────────────
Events: ●→●→●→●→●→●→●→●→●→●→●→●→●→●→●→●→●→
        │  │  │  │  │  │  │  │  │  │  │  │
        ▼  ▼  ▼  ▼  ▼  ▼  ▼  ▼  ▼  ▼  ▼  ▼
     Process each event as it arrives
     (or in micro-batches of 100ms)

Latency: milliseconds to seconds
Throughput: high (but per-event overhead)


Stream Processing Pipeline:
┌────────┐   ┌──────────┐   ┌──────────┐   ┌──────────┐   ┌────────┐
│ Source │──→│  Filter  │──→│   Map    │──→│ Aggregate│──→│  Sink  │
│ (Kafka)│   │(drop bad)│   │(transform)│  │(window)  │   │(DB/API)│
└────────┘   └──────────┘   └──────────┘   └──────────┘   └────────┘

Stream Processing Core Abstractions

Fundamental Concepts:

Record/Event: A single data item in the stream
  { key: "user-42", value: { action: "click", page: "/home" }, timestamp: 1709553600000 }

Stream: Unbounded, continuously appended sequence of records
  ──●──●──●──●──●──●──●──●──●──●──→  (infinite)

Table: A snapshot of the latest value for each key
  ┌──────────┬───────────────────┐
  │ Key      │ Value             │
  ├──────────┼───────────────────┤
  │ user-42  │ { clicks: 15 }   │
  │ user-99  │ { clicks: 8 }    │
  └──────────┴───────────────────┘

Stream ←→ Table Duality:
  Stream → Table: Apply each event as an update
  Table  → Stream: Emit each change as an event (CDC)

Topology: A DAG of processing operators
  Source → Filter → Map → GroupBy → Aggregate → Sink

Building a Stream Processing Engine from Scratch

Record and Topology Types

interface StreamRecord<K, V> {
  key: K;
  value: V;
  timestamp: number;
  offset: number;           // Position in source partition
  partition: number;
  topic: string;
  headers?: Map<string, string>;
}

interface ProcessorContext<K, V> {
  forward(record: StreamRecord<K, V>): void;
  commit(): Promise<void>;
  schedule(intervalMs: number, callback: () => void): void;
  getStateStore<S>(name: string): StateStore<S>;
  currentTimestamp(): number;
  taskId: string;
}

interface Processor<KIn, VIn, KOut, VOut> {
  init(context: ProcessorContext<KOut, VOut>): void;
  process(record: StreamRecord<KIn, VIn>): void;
  close(): void;
}

interface StateStore<V> {
  get(key: string): V | undefined;
  put(key: string, value: V): void;
  delete(key: string): void;
  range(from: string, to: string): Array<[string, V]>;
  all(): Array<[string, V]>;
  approximateNumEntries(): number;
  flush(): Promise<void>;
}

Stateful Processing Engine

class StreamProcessingEngine {
  private topology: ProcessorNode[] = [];
  private stateStores: Map<string, InMemoryStateStore<any>> = new Map();
  private committed: Map<string, number> = new Map(); // topic:partition → offset
  private running: boolean = false;
  
  constructor(
    private consumer: StreamConsumer,
    private producer: StreamProducer,
    private changelogTopic?: string   // For state store recovery
  ) {}

  addSource(
    name: string, 
    topics: string[], 
    processor: Processor<any, any, any, any>
  ): StreamProcessingEngine {
    this.topology.push({
      name,
      type: 'source',
      topics,
      processor,
      children: []
    });
    return this;
  }

  addProcessor(
    name: string, 
    parentName: string, 
    processor: Processor<any, any, any, any>,
    stateStoreNames?: string[]
  ): StreamProcessingEngine {
    const parent = this.findNode(parentName);
    if (!parent) throw new Error(`Parent ${parentName} not found`);
    
    const node: ProcessorNode = {
      name,
      type: 'processor',
      processor,
      children: [],
      stateStoreNames
    };
    
    parent.children.push(node);
    this.topology.push(node);
    return this;
  }

  addSink(
    name: string, 
    parentName: string, 
    outputTopic: string
  ): StreamProcessingEngine {
    const parent = this.findNode(parentName);
    if (!parent) throw new Error(`Parent ${parentName} not found`);
    
    const sinkProcessor: Processor<any, any, any, any> = {
      init: () => {},
      process: (record) => {
        this.producer.send(outputTopic, record);
      },
      close: () => {}
    };
    
    parent.children.push({
      name,
      type: 'sink',
      processor: sinkProcessor,
      children: [],
      outputTopic
    });
    
    return this;
  }

  addStateStore(name: string): StreamProcessingEngine {
    this.stateStores.set(name, new InMemoryStateStore(name));
    return this;
  }

  async start(): Promise<void> {
    this.running = true;
    
    // Recover state from changelog topic
    if (this.changelogTopic) {
      await this.recoverState();
    }
    
    // Initialize all processors
    for (const node of this.topology) {
      const context = this.createContext(node);
      node.processor.init(context);
    }
    
    // Subscribe to source topics
    const allTopics = this.topology
      .filter(n => n.type === 'source')
      .flatMap(n => n.topics || []);
    
    await this.consumer.subscribe(allTopics);
    
    // Main processing loop
    while (this.running) {
      const records = await this.consumer.poll(100); // 100ms poll timeout
      
      for (const record of records) {
        await this.processRecord(record);
      }
      
      // Periodic commit
      await this.commitOffsets();
    }
  }

  private async processRecord(record: StreamRecord<any, any>): Promise<void> {
    // Find matching source nodes
    const sourceNodes = this.topology.filter(
      n => n.type === 'source' && n.topics?.includes(record.topic)
    );
    
    for (const source of sourceNodes) {
      this.dispatchToNode(source, record);
    }
  }

  private dispatchToNode(node: ProcessorNode, record: StreamRecord<any, any>): void {
    const context = this.createContext(node);
    
    // Process the record
    node.processor.process(record);
    
    // Forward to children
    for (const child of node.children) {
      this.dispatchToNode(child, record);
    }
  }

  private createContext(node: ProcessorNode): ProcessorContext<any, any> {
    const engine = this;
    
    return {
      forward(record: StreamRecord<any, any>): void {
        for (const child of node.children) {
          engine.dispatchToNode(child, record);
        }
      },
      
      async commit(): Promise<void> {
        await engine.commitOffsets();
      },
      
      schedule(intervalMs: number, callback: () => void): void {
        setInterval(callback, intervalMs);
      },
      
      getStateStore<S>(name: string): StateStore<S> {
        const store = engine.stateStores.get(name);
        if (!store) throw new Error(`State store ${name} not found`);
        return store as StateStore<S>;
      },
      
      currentTimestamp: () => Date.now(),
      taskId: node.name
    };
  }

  private async commitOffsets(): Promise<void> {
    await this.consumer.commit();
    
    // Flush state stores to changelog
    if (this.changelogTopic) {
      for (const [name, store] of this.stateStores) {
        await store.flush();
      }
    }
  }

  private async recoverState(): Promise<void> {
    if (!this.changelogTopic) return;
    
    // Read changelog from beginning to rebuild state
    const changelogConsumer = this.consumer.createChangelogConsumer(this.changelogTopic);
    
    let record: StreamRecord<string, any> | null;
    while ((record = await changelogConsumer.readNext()) !== null) {
      const storeName = record.headers?.get('state-store');
      if (storeName && this.stateStores.has(storeName)) {
        const store = this.stateStores.get(storeName)!;
        if (record.value === null) {
          store.delete(record.key);
        } else {
          store.put(record.key, record.value);
        }
      }
    }
  }

  async stop(): Promise<void> {
    this.running = false;
    
    for (const node of this.topology) {
      node.processor.close();
    }
    
    await this.commitOffsets();
  }

  private findNode(name: string): ProcessorNode | undefined {
    return this.topology.find(n => n.name === name);
  }
}

interface ProcessorNode {
  name: string;
  type: 'source' | 'processor' | 'sink';
  topics?: string[];
  outputTopic?: string;
  processor: Processor<any, any, any, any>;
  children: ProcessorNode[];
  stateStoreNames?: string[];
}

class InMemoryStateStore<V> implements StateStore<V> {
  private data: Map<string, V> = new Map();
  private dirty: Map<string, V | null> = new Map(); // Changelog buffer
  
  constructor(private name: string) {}
  
  get(key: string): V | undefined {
    return this.data.get(key);
  }
  
  put(key: string, value: V): void {
    this.data.set(key, value);
    this.dirty.set(key, value);
  }
  
  delete(key: string): void {
    this.data.delete(key);
    this.dirty.set(key, null); // Tombstone
  }
  
  range(from: string, to: string): Array<[string, V]> {
    const results: Array<[string, V]> = [];
    for (const [key, value] of this.data) {
      if (key >= from && key <= to) {
        results.push([key, value]);
      }
    }
    return results.sort((a, b) => a[0].localeCompare(b[0]));
  }
  
  all(): Array<[string, V]> {
    return Array.from(this.data.entries());
  }
  
  approximateNumEntries(): number {
    return this.data.size;
  }
  
  async flush(): Promise<void> {
    // In production: write dirty entries to changelog topic
    this.dirty.clear();
  }
}

interface StreamConsumer {
  subscribe(topics: string[]): Promise<void>;
  poll(timeoutMs: number): Promise<StreamRecord<any, any>[]>;
  commit(): Promise<void>;
  createChangelogConsumer(topic: string): ChangelogConsumer;
}

interface StreamProducer {
  send(topic: string, record: StreamRecord<any, any>): Promise<void>;
}

interface ChangelogConsumer {
  readNext(): Promise<StreamRecord<string, any> | null>;
}

Windowing: Aggregating Bounded Slices of Unbounded Streams

Window Types:

Tumbling Window (fixed, non-overlapping):
──────┬──────┬──────┬──────┬──────→ time
│  W1  │  W2  │  W3  │  W4  │
│ 0-5m │ 5-10m│10-15m│15-20m│

Each event belongs to exactly ONE window.
Use: metrics per minute, hourly aggregations


Hopping/Sliding Window (fixed, overlapping):
──────┬────────────┬──────→ time
│     W1 (10min)   │
│  ┌──┤            │
│  │  W2 (10min)   │
│  │  ┌──┤        │
│  │  │  W3 (10min)│
  advance=5min, size=10min

Events can belong to MULTIPLE windows.
Use: moving averages, rolling counts


Session Window (dynamic, gap-based):
──●●●──gap──●●──gap──●●●●●●──gap──●●→
 │ S1 │     │S2│     │   S3   │   │S4│

Window closes when no events arrive for gap duration.
Use: user sessions, activity bursts


Global Window:
──────────────────────────────────→ time
│           Everything             │

Single window for all time.
Use with triggers: emit every N events or every N seconds.

Window Implementation

interface Window {
  start: number;
  end: number;
}

interface WindowAssigner {
  assignWindows(timestamp: number): Window[];
  windowSize: number;
}

class TumblingWindowAssigner implements WindowAssigner {
  constructor(public windowSize: number) {}
  
  assignWindows(timestamp: number): Window[] {
    const start = Math.floor(timestamp / this.windowSize) * this.windowSize;
    return [{ start, end: start + this.windowSize }];
  }
}

class HoppingWindowAssigner implements WindowAssigner {
  constructor(
    public windowSize: number, 
    private advance: number
  ) {}
  
  assignWindows(timestamp: number): Window[] {
    const windows: Window[] = [];
    
    // Find all windows this timestamp falls into
    const firstWindowStart = Math.floor(timestamp / this.advance) * this.advance - this.windowSize + this.advance;
    
    for (let start = firstWindowStart; start <= timestamp; start += this.advance) {
      if (start + this.windowSize > timestamp) {
        windows.push({ start, end: start + this.windowSize });
      }
    }
    
    return windows;
  }
}

class SessionWindowAssigner implements WindowAssigner {
  windowSize: number; // Actually the gap duration
  
  constructor(private gapMs: number) {
    this.windowSize = gapMs;
  }
  
  // Session windows are assigned dynamically — they merge
  assignWindows(timestamp: number): Window[] {
    return [{ start: timestamp, end: timestamp + this.gapMs }];
  }
}

type AggregatorFn<V, A> = {
  initialValue: () => A;
  add: (accumulator: A, value: V) => A;
  remove?: (accumulator: A, value: V) => A; // For hopping windows
  merge?: (a: A, b: A) => A;               // For session window merging
};

class WindowedAggregator<K, V, A> {
  private windowStore: Map<string, A> = new Map(); // "key:windowStart" → aggregate
  private windowTimers: Map<string, NodeJS.Timeout> = new Map();
  
  constructor(
    private assigner: WindowAssigner,
    private aggregator: AggregatorFn<V, A>,
    private watermarkTracker: WatermarkTracker,
    private onWindowClose: (key: K, window: Window, result: A) => void,
    private allowedLateness: number = 0
  ) {}

  process(key: K, value: V, timestamp: number): void {
    const windows = this.assigner.assignWindows(timestamp);
    
    for (const window of windows) {
      const storeKey = `${String(key)}:${window.start}`;
      
      // Check if window is still open (considering watermark + lateness)
      const watermark = this.watermarkTracker.currentWatermark();
      if (window.end + this.allowedLateness < watermark) {
        // Late event beyond allowed lateness — drop or send to side output
        continue;
      }
      
      // Get or initialize aggregate
      let aggregate = this.windowStore.get(storeKey);
      if (aggregate === undefined) {
        aggregate = this.aggregator.initialValue();
        this.scheduleWindowClose(key, window, storeKey);
      }
      
      // Apply aggregation
      aggregate = this.aggregator.add(aggregate, value);
      this.windowStore.set(storeKey, aggregate);
    }
    
    // Advance watermark
    this.watermarkTracker.update(timestamp);
  }

  private scheduleWindowClose(key: K, window: Window, storeKey: string): void {
    // Window closes when watermark passes window.end + lateness
    const checkClose = () => {
      const watermark = this.watermarkTracker.currentWatermark();
      
      if (watermark >= window.end + this.allowedLateness) {
        const result = this.windowStore.get(storeKey);
        if (result !== undefined) {
          this.onWindowClose(key, window, result);
          this.windowStore.delete(storeKey);
          this.windowTimers.delete(storeKey);
        }
      } else {
        // Check again later
        const timer = setTimeout(checkClose, 1000);
        this.windowTimers.set(storeKey, timer);
      }
    };
    
    const timer = setTimeout(checkClose, this.assigner.windowSize);
    this.windowTimers.set(storeKey, timer);
  }

  // Session window merging: merge overlapping sessions
  mergeSessionWindows(key: K): void {
    if (!(this.assigner instanceof SessionWindowAssigner)) return;
    
    const prefix = `${String(key)}:`;
    const sessions: Array<{ start: number; end: number; storeKey: string; value: A }> = [];
    
    for (const [storeKey, value] of this.windowStore) {
      if (storeKey.startsWith(prefix)) {
        const start = parseInt(storeKey.slice(prefix.length));
        sessions.push({
          start,
          end: start + this.assigner.windowSize,
          storeKey,
          value
        });
      }
    }
    
    // Sort by start time
    sessions.sort((a, b) => a.start - b.start);
    
    // Merge overlapping sessions
    for (let i = 0; i < sessions.length - 1; i++) {
      if (sessions[i].end >= sessions[i + 1].start) {
        // Merge session i+1 into session i
        sessions[i].end = Math.max(sessions[i].end, sessions[i + 1].end);
        
        if (this.aggregator.merge) {
          sessions[i].value = this.aggregator.merge(sessions[i].value, sessions[i + 1].value);
        }
        
        // Remove session i+1
        this.windowStore.delete(sessions[i + 1].storeKey);
        sessions.splice(i + 1, 1);
        i--; // Recheck current position
      }
    }
  }
}

Watermarks & Event-Time Processing

The Problem: Events Arrive Out of Order

Event time:  10:01  10:03  10:02  10:05  10:04  10:06
Arrival time: 10:01  10:02  10:02  10:03  10:06  10:06
                                          ↑
                                     10:04 arrived late!

If we close the [10:00-10:05) window at arrival time 10:05,
we miss the event with event time 10:04.


Watermark: "I believe no events with timestamp < W will arrive"

Events:    ●(10:01) ●(10:03) ●(10:02) ●(10:05) ●(10:04) ●(10:06)
                                                      ↑
Watermark: ───10:00────10:01────10:01────10:03────10:03────10:04──→

Watermark advances conservatively.
Window [10:00-10:05) closes when watermark reaches 10:05.
Late events (after watermark) either:
  - Dropped (simple)
  - Processed with allowed lateness (window stays open longer)
  - Sent to side output (dead letter)

Watermark Implementation

class WatermarkTracker {
  private watermark: number = -Infinity;
  private maxOutOfOrderness: number; // Expected max lateness
  private lastEventTime: number = 0;
  
  constructor(maxOutOfOrdernessMs: number = 5000) {
    this.maxOutOfOrderness = maxOutOfOrdernessMs;
  }
  
  update(eventTimestamp: number): void {
    this.lastEventTime = Math.max(this.lastEventTime, eventTimestamp);
    
    // Watermark = max event time seen - max out-of-orderness
    const proposedWatermark = this.lastEventTime - this.maxOutOfOrderness;
    
    // Watermark only advances (never goes backward)
    this.watermark = Math.max(this.watermark, proposedWatermark);
  }
  
  currentWatermark(): number {
    return this.watermark;
  }
  
  // Periodic watermark advancement when no events arrive
  advanceOnIdle(currentProcessingTime: number): void {
    // If no events for 2x max out-of-orderness, advance watermark
    // This prevents windows from staying open forever during idle periods
    const idleThreshold = this.maxOutOfOrderness * 2;
    if (currentProcessingTime - this.lastEventTime > idleThreshold) {
      this.watermark = currentProcessingTime - this.maxOutOfOrderness;
    }
  }
}

// Per-partition watermark tracking
class PartitionedWatermarkTracker {
  private partitionWatermarks: Map<number, number> = new Map();
  
  updatePartition(partition: number, eventTimestamp: number, maxLag: number): void {
    const current = this.partitionWatermarks.get(partition) || -Infinity;
    const proposed = eventTimestamp - maxLag;
    this.partitionWatermarks.set(partition, Math.max(current, proposed));
  }
  
  // Global watermark = minimum of all partition watermarks
  globalWatermark(): number {
    if (this.partitionWatermarks.size === 0) return -Infinity;
    return Math.min(...this.partitionWatermarks.values());
  }
}

Exactly-Once Semantics

Delivery Guarantees:

At-Most-Once:
  Producer → Broker → Consumer
  Fire and forget. May lose messages.
  Consumer processes, does NOT commit offset → crash → skip message


At-Least-Once:
  Producer → Broker → Consumer
  Producer retries on failure. Consumer commits AFTER processing.
  If consumer crashes after processing but before commit → reprocess
  
  Process → [crash] → Restart → Process AGAIN → Commit
  Side effects happen TWICE


Exactly-Once:
  Producer → Broker → Consumer
  Each message is processed exactly once, even with failures.
  Requires: idempotent producers + transactional consumers
  
  Kafka's approach:
  1. Idempotent producer: broker deduplicates by producer ID + sequence #
  2. Transactions: atomically write output + commit input offset
  3. Consumer reads only COMMITTED messages (isolation.level=read_committed)

Exactly-Once Processing Implementation

interface TransactionalProducer {
  initTransactions(): Promise<void>;
  beginTransaction(): void;
  send(topic: string, record: StreamRecord<any, any>): Promise<void>;
  sendOffsetsToTransaction(offsets: Map<string, number>): void;
  commitTransaction(): Promise<void>;
  abortTransaction(): Promise<void>;
}

class ExactlyOnceProcessor<KIn, VIn, KOut, VOut> {
  private txProducer: TransactionalProducer;
  private consumer: StreamConsumer;
  private processor: Processor<KIn, VIn, KOut, VOut>;
  private outputBuffer: StreamRecord<KOut, VOut>[] = [];
  private running: boolean = false;
  
  constructor(
    txProducer: TransactionalProducer,
    consumer: StreamConsumer,
    processor: Processor<KIn, VIn, KOut, VOut>,
    private outputTopic: string,
    private commitIntervalMs: number = 200
  ) {
    this.txProducer = txProducer;
    this.consumer = consumer;
    this.processor = processor;
  }

  async start(): Promise<void> {
    this.running = true;
    await this.txProducer.initTransactions();
    
    const context = this.createContext();
    this.processor.init(context);
    
    let lastCommit = Date.now();
    
    while (this.running) {
      const records = await this.consumer.poll(100);
      
      for (const record of records) {
        this.processor.process(record as StreamRecord<KIn, VIn>);
      }
      
      // Commit periodically (not per-record for performance)
      if (Date.now() - lastCommit >= this.commitIntervalMs && this.outputBuffer.length > 0) {
        await this.commitTransaction(records);
        lastCommit = Date.now();
      }
    }
  }

  private async commitTransaction(inputRecords: StreamRecord<any, any>[]): Promise<void> {
    try {
      // Begin atomic transaction
      this.txProducer.beginTransaction();
      
      // Step 1: Send all output records within transaction
      for (const record of this.outputBuffer) {
        await this.txProducer.send(this.outputTopic, record);
      }
      
      // Step 2: Commit input offsets within SAME transaction
      const offsets = new Map<string, number>();
      for (const record of inputRecords) {
        const key = `${record.topic}:${record.partition}`;
        const current = offsets.get(key) || -1;
        offsets.set(key, Math.max(current, record.offset + 1));
      }
      this.txProducer.sendOffsetsToTransaction(offsets);
      
      // Step 3: Commit transaction atomically
      // Output records + offset commits are ALL committed together
      await this.txProducer.commitTransaction();
      
      this.outputBuffer = [];
    } catch (error) {
      // Abort transaction — nothing was committed
      await this.txProducer.abortTransaction();
      this.outputBuffer = [];
      throw error;
    }
  }

  private createContext(): ProcessorContext<KOut, VOut> {
    return {
      forward: (record: StreamRecord<KOut, VOut>) => {
        this.outputBuffer.push(record);
      },
      commit: async () => {}, // Handled by commitTransaction
      schedule: (interval, callback) => setInterval(callback, interval),
      getStateStore: (name) => { throw new Error('State stores not yet supported'); },
      currentTimestamp: () => Date.now(),
      taskId: 'eo-processor'
    };
  }

  async stop(): Promise<void> {
    this.running = false;
    this.processor.close();
  }
}

Backpressure: Controlling Flow When Consumer Is Slower Than Producer

The Problem:

Producer: 100K events/sec ═══════════╗
                                     ║
Consumer: 10K events/sec ◄═══════════╝
                                     ║
                              Buffer grows unbounded
                              → OOM crash
                              → Message loss


Backpressure Strategies:

1. Drop (Lossy):
   Producer ══●●●●●●●●══╗
                         ║ buffer full
   Drop new events → ●●●╳╳╳╳╳
   Consumer ◄════●●●═════╝
   Use: Metrics, monitoring (sampling OK)

2. Buffer + Spill to Disk:
   Producer ══●●●●●●●●══╗
                         ║ memory buffer full
   Spill →  [disk buffer]║
   Consumer ◄════●●●═════╝ ← reads from memory, then disk
   Use: Log processing, batch jobs

3. Rate Limiting (Push-Back):
   Producer ══●●──SLOW DOWN──●●══╗
                                  ║ consumer signals "slow down"
   Consumer ◄════●●═══════════════╝
   Use: TCP flow control, Reactive Streams

4. Dynamic Scaling:
   Producer ══●●●●●●●●══╗
                         ║ consumer lag increasing
   Auto-scale consumers: ║
   Consumer 1 ◄════●●════╝
   Consumer 2 ◄════●●════╝ (new)
   Consumer 3 ◄════●●════╝ (new)
   Use: Kafka consumer groups, Kubernetes HPA

Backpressure Implementation

class BackpressureController {
  private bufferSize: number = 0;
  private highWatermark: number;
  private lowWatermark: number;
  private paused: boolean = false;
  
  // Callbacks
  private onPause?: () => void;
  private onResume?: () => void;
  private onDrop?: (record: any) => void;
  
  constructor(options: {
    highWatermark: number;  // Pause when buffer exceeds this
    lowWatermark: number;   // Resume when buffer drops below this
    strategy: 'pause' | 'drop' | 'buffer';
    onPause?: () => void;
    onResume?: () => void;
    onDrop?: (record: any) => void;
  }) {
    this.highWatermark = options.highWatermark;
    this.lowWatermark = options.lowWatermark;
    this.onPause = options.onPause;
    this.onResume = options.onResume;
    this.onDrop = options.onDrop;
  }

  recordReceived(size: number = 1): boolean {
    this.bufferSize += size;
    
    if (this.bufferSize >= this.highWatermark && !this.paused) {
      this.paused = true;
      this.onPause?.();
      return false; // Signal: stop producing
    }
    
    return true; // OK to continue
  }

  recordProcessed(size: number = 1): void {
    this.bufferSize = Math.max(0, this.bufferSize - size);
    
    if (this.paused && this.bufferSize <= this.lowWatermark) {
      this.paused = false;
      this.onResume?.();
    }
  }

  isPaused(): boolean {
    return this.paused;
  }

  currentLoad(): number {
    return this.bufferSize / this.highWatermark;
  }
}

// Reactive Streams-style pull-based backpressure
class PullBasedStream<T> {
  private buffer: T[] = [];
  private demand: number = 0; // Outstanding demand from subscriber
  private subscriber?: StreamSubscriber<T>;
  
  subscribe(subscriber: StreamSubscriber<T>): void {
    this.subscriber = subscriber;
    subscriber.onSubscribe({
      request: (n: number) => {
        this.demand += n;
        this.drain();
      },
      cancel: () => {
        this.subscriber = undefined;
      }
    });
  }

  emit(item: T): void {
    this.buffer.push(item);
    this.drain();
  }

  private drain(): void {
    while (this.demand > 0 && this.buffer.length > 0 && this.subscriber) {
      const item = this.buffer.shift()!;
      this.demand--;
      this.subscriber.onNext(item);
    }
  }
}

interface StreamSubscriber<T> {
  onSubscribe(subscription: StreamSubscription): void;
  onNext(item: T): void;
  onError(error: Error): void;
  onComplete(): void;
}

interface StreamSubscription {
  request(n: number): void;
  cancel(): void;
}

// Consumer lag monitoring for auto-scaling decisions
class ConsumerLagMonitor {
  private lagSamples: Map<string, number[]> = new Map();
  private readonly maxSamples = 60; // 1 minute of samples (1/sec)
  
  recordLag(consumerGroup: string, partition: number, lag: number): void {
    const key = `${consumerGroup}:${partition}`;
    
    if (!this.lagSamples.has(key)) {
      this.lagSamples.set(key, []);
    }
    
    const samples = this.lagSamples.get(key)!;
    samples.push(lag);
    
    if (samples.length > this.maxSamples) {
      samples.shift();
    }
  }

  shouldScaleUp(consumerGroup: string): ScaleDecision {
    let totalLag = 0;
    let increasing = 0;
    let partitionCount = 0;
    
    for (const [key, samples] of this.lagSamples) {
      if (!key.startsWith(consumerGroup + ':')) continue;
      
      partitionCount++;
      totalLag += samples[samples.length - 1] || 0;
      
      // Check if lag is increasing (linear regression slope > 0)
      if (samples.length >= 10) {
        const recentAvg = average(samples.slice(-5));
        const olderAvg = average(samples.slice(-10, -5));
        if (recentAvg > olderAvg * 1.2) increasing++;
      }
    }
    
    return {
      shouldScale: increasing > partitionCount * 0.5, // >50% partitions growing
      totalLag,
      partitionCount,
      increasingPartitions: increasing,
      suggestedConsumers: Math.min(
        partitionCount, // Can't have more consumers than partitions
        Math.ceil(totalLag / 10000) // 1 consumer per 10K lag
      )
    };
  }
}

interface ScaleDecision {
  shouldScale: boolean;
  totalLag: number;
  partitionCount: number;
  increasingPartitions: number;
  suggestedConsumers: number;
}

function average(nums: number[]): number {
  return nums.reduce((sum, n) => sum + n, 0) / nums.length;
}

Common Stream Processing Operators

// Filter: drop records that don't match predicate
class FilterProcessor<K, V> implements Processor<K, V, K, V> {
  private ctx!: ProcessorContext<K, V>;
  
  constructor(private predicate: (key: K, value: V) => boolean) {}
  
  init(context: ProcessorContext<K, V>): void { this.ctx = context; }
  
  process(record: StreamRecord<K, V>): void {
    if (this.predicate(record.key, record.value)) {
      this.ctx.forward(record);
    }
  }
  
  close(): void {}
}

// Map: transform each record
class MapProcessor<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut> {
  private ctx!: ProcessorContext<KOut, VOut>;
  
  constructor(private mapper: (key: KIn, value: VIn) => { key: KOut; value: VOut }) {}
  
  init(context: ProcessorContext<KOut, VOut>): void { this.ctx = context; }
  
  process(record: StreamRecord<KIn, VIn>): void {
    const { key, value } = this.mapper(record.key, record.value);
    this.ctx.forward({ ...record, key, value } as any);
  }
  
  close(): void {}
}

// FlatMap: one input → zero or more outputs
class FlatMapProcessor<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut> {
  private ctx!: ProcessorContext<KOut, VOut>;
  
  constructor(private mapper: (key: KIn, value: VIn) => Array<{ key: KOut; value: VOut }>) {}
  
  init(context: ProcessorContext<KOut, VOut>): void { this.ctx = context; }
  
  process(record: StreamRecord<KIn, VIn>): void {
    const results = this.mapper(record.key, record.value);
    for (const { key, value } of results) {
      this.ctx.forward({ ...record, key, value } as any);
    }
  }
  
  close(): void {}
}

// Stateful aggregate with state store
class AggregateProcessor<K, V, A> implements Processor<K, V, K, A> {
  private ctx!: ProcessorContext<K, A>;
  private store!: StateStore<A>;
  
  constructor(
    private storeName: string,
    private initializer: () => A,
    private aggregator: (key: K, value: V, aggregate: A) => A
  ) {}
  
  init(context: ProcessorContext<K, A>): void {
    this.ctx = context;
    this.store = context.getStateStore(this.storeName);
  }
  
  process(record: StreamRecord<K, V>): void {
    const keyStr = String(record.key);
    let current = this.store.get(keyStr);
    
    if (current === undefined) {
      current = this.initializer();
    }
    
    const updated = this.aggregator(record.key, record.value, current);
    this.store.put(keyStr, updated);
    
    this.ctx.forward({ ...record, value: updated } as any);
  }
  
  close(): void {}
}

// Join: combine two streams by key within a time window
class StreamStreamJoin<K, V1, V2, VOut> {
  private buffer1: Map<string, Array<{ value: V1; timestamp: number }>> = new Map();
  private buffer2: Map<string, Array<{ value: V2; timestamp: number }>> = new Map();
  
  constructor(
    private joinWindow: number, // ms
    private joiner: (key: K, left: V1, right: V2) => VOut,
    private onResult: (key: K, result: VOut, timestamp: number) => void
  ) {}

  processLeft(key: K, value: V1, timestamp: number): void {
    const keyStr = String(key);
    
    // Store in left buffer
    if (!this.buffer1.has(keyStr)) this.buffer1.set(keyStr, []);
    this.buffer1.get(keyStr)!.push({ value, timestamp });
    
    // Check right buffer for matching records within window
    const rightRecords = this.buffer2.get(keyStr) || [];
    for (const right of rightRecords) {
      if (Math.abs(right.timestamp - timestamp) <= this.joinWindow) {
        const result = this.joiner(key, value, right.value);
        this.onResult(key, result, Math.max(timestamp, right.timestamp));
      }
    }
    
    this.cleanExpired(this.buffer1, timestamp);
  }

  processRight(key: K, value: V2, timestamp: number): void {
    const keyStr = String(key);
    
    if (!this.buffer2.has(keyStr)) this.buffer2.set(keyStr, []);
    this.buffer2.get(keyStr)!.push({ value, timestamp });
    
    const leftRecords = this.buffer1.get(keyStr) || [];
    for (const left of leftRecords) {
      if (Math.abs(left.timestamp - timestamp) <= this.joinWindow) {
        const result = this.joiner(key, left.value, value);
        this.onResult(key, result, Math.max(timestamp, left.timestamp));
      }
    }
    
    this.cleanExpired(this.buffer2, timestamp);
  }

  private cleanExpired(buffer: Map<string, Array<{ timestamp: number }>>, currentTime: number): void {
    for (const [key, records] of buffer) {
      const filtered = records.filter(r => currentTime - r.timestamp <= this.joinWindow * 2);
      if (filtered.length === 0) {
        buffer.delete(key);
      } else {
        buffer.set(key, filtered as any);
      }
    }
  }
}

Comparing Stream Processing Frameworks

┌──────────────┬───────────────┬───────────────┬───────────────┬───────────────┐
│ Feature      │ Kafka Streams │ Apache Flink  │ Spark Streaming│ Pulsar Func. │
├──────────────┼───────────────┼───────────────┼───────────────┼───────────────┤
│ Model        │ Event-at-a-   │ Event-at-a-   │ Micro-batch   │ Event/Function│
│              │ time          │ time          │ (100ms+)      │               │
│ Deployment   │ Library (JVM) │ Cluster       │ Cluster       │ Serverless /  │
│              │               │ (standalone)  │ (YARN/K8s)    │ Library       │
│ State        │ RocksDB local │ RocksDB +     │ In-memory +   │ BookKeeper    │
│              │ + changelog   │ checkpoints   │ checkpoints   │               │
│ Exactly-Once │ Transactions  │ Checkpoints + │ WAL + check-  │ Effectively   │
│              │               │ 2PC sinks     │ points        │ once          │
│ Latency      │ ~1ms          │ ~1ms          │ ~100ms-1s     │ ~5ms          │
│ Throughput   │ ~500K/s       │ Millions/s    │ Millions/s    │ ~100K/s       │
│ Windowing    │ Time, Session │ Time, Session,│ Time only     │ Basic         │
│              │               │ Count, Custom │               │               │
│ Join Support │ KStream-      │ Full (stream- │ Stream-stream │ Limited       │
│              │ KTable, etc.  │ stream, table)│               │               │
│ Scaling      │ Kafka consumer│ TaskManager   │ Executors     │ Instances     │
│              │ groups        │ parallelism   │ parallelism   │               │
│ Fault Tol.   │ Consumer      │ Checkpoints + │ RDD lineage   │ Auto-restart  │
│              │ rebalance     │ savepoints    │               │               │
│ Best For     │ Kafka-centric │ Complex event │ ML pipelines, │ Simple event  │
│              │ microservices │ processing    │ batch + stream│ processing    │
└──────────────┴───────────────┴───────────────┴───────────────┴───────────────┘

Interview Questions

Q1: Explain the difference between event-time and processing-time in stream processing. Why does it matter?

Processing time is when the event reaches the processing engine — Date.now() on the server. Event time is when the event actually occurred — the timestamp embedded in the event payload. The distinction matters because events arrive out of order. A mobile app might batch events and send them 30 seconds late. Network delays, retries, and partition rebalancing all cause ordering disruption. If you window by processing time, events with event-time 10:01 might land in the 10:02 window because they arrived late. This produces incorrect aggregations. Event-time processing uses watermarks to track progress — the watermark represents the engine's belief that all events before time W have arrived. Windows close based on watermark advancement, not wall-clock time. The trade-off: event-time processing is more complex (requires watermark tracking, late event handling, allowed lateness configuration) but produces correct results. Processing-time is simpler and lower latency but produces approximate results.

Q2: How does Kafka achieve exactly-once semantics in stream processing?

Kafka's exactly-once has three components working together. (1) Idempotent producer: Each producer gets a unique PID (producer ID) from the broker. Every message carries a sequence number. The broker deduplicates by (PID, partition, sequence) — if it sees a duplicate sequence number, it discards the message but returns success. This prevents double-writes due to producer retries. (2) Transactions: A producer can start a transaction that atomically writes output messages AND commits consumer offsets. The broker maintains a transaction coordinator that tracks the transaction state. On commit, all writes become visible atomically. On abort, none are visible. (3) Read committed consumers: Consumers configured with isolation.level=read_committed only see messages from committed transactions. Aborted transaction messages are invisible. The combination means: consumer reads input → processes → producer writes output + commits input offset atomically. If the processor crashes, the transaction is either committed (all visible) or not (nothing visible) — no partial state.

Q3: What is backpressure in stream processing, and how do you handle it?

Backpressure occurs when a downstream component processes slower than the upstream produces. Without control, internal buffers grow until OOM. Four strategies: (1) Drop: Discard excess events (lossy). Suitable for metrics where sampling is acceptable. (2) Buffer with bounds: Accept events into a bounded buffer. When full, block the producer or spill to disk. This absorbs temporary bursts but OOMs on sustained overload. (3) Rate limiting / push-back: The consumer signals the producer to slow down. TCP does this natively with window sizes. Reactive Streams (Java/Kotlin) use request(n) where the subscriber tells the publisher how many items it can accept. Flink propagates backpressure through the operator chain via network buffers — when a downstream buffer is full, the upstream operator blocks on emit(). (4) Auto-scaling: Monitor consumer lag and add more consumer instances. Kafka consumer groups rebalance partitions across new consumers automatically. This is the only strategy that actually increases throughput rather than just controlling flow.

Q4: How do windowed aggregations work, and what happens with late-arriving events?

Windowed aggregations group events into time-bounded buckets and compute aggregates per bucket. Tumbling windows are fixed, non-overlapping intervals (e.g., every 5 minutes). Hopping windows overlap (10-minute window advancing every 5 minutes). Session windows are dynamic — they close after a gap of inactivity. When a window closes, it emits its aggregate result. Windows close based on watermark advancement — when the watermark passes the window's end time, the window fires. Late events (arriving after watermark but within the allowed lateness) trigger an updated result for the already-fired window — this updates downstream. Events arriving beyond the allowed lateness are dropped or sent to a side output. The allowed lateness creates a trade-off: larger lateness means better completeness but higher memory (more open windows) and higher output latency. In practice, set allowed lateness based on your domain — real-time dashboards might tolerate 0 lateness, billing aggregations might allow 1 hour.

Q5: Explain the stream-table duality. How is it used in practice?

A stream is an unbounded sequence of immutable events. A table is a mutable snapshot of the latest value per key. They're dual representations of the same data: (1) Stream → Table: Apply each stream event as an update to the table. INSERT key=A, value=1 then UPDATE key=A, value=2 gives a table with A=2. This is called "materializing" a stream. (2) Table → Stream: Emit every change to the table as a stream event. This is Change Data Capture (CDC). In Kafka Streams, a KTable is a compacted topic where each key retains only the latest value. When you join a KStream (events) with a KTable (lookup), the KTable acts like a dimension table that's continuously updated. Example: join a stream of order events with a customer KTable to enrich orders with customer details. The KTable is backed by a local RocksDB store for fast lookups. As the customer topic receives updates, the local store is updated, and subsequent joins use the latest customer data. This pattern replaces traditional batch ETL joins with real-time enrichment.


Real-World Problems & How to Solve Them

Problem 1: Window counts are wrong when events arrive late

Symptom: Hourly dashboards show spikes and dips that disappear after backfills.

Root cause: Windows are keyed by processing time instead of event time; out-of-order events are assigned to the wrong bucket.

Fix — extract event-time and drive watermark from observed timestamps:

interface RawClick {
  userId: string;
  occurredAtIso: string;
  page: string;
}

function toRecord(
  raw: RawClick,
  offset: number,
  partition: number
): StreamRecord<string, RawClick> {
  return {
    key: raw.userId,
    value: raw,
    timestamp: Date.parse(raw.occurredAtIso),
    offset,
    partition,
    topic: 'clicks',
  };
}

class WatermarkTracker {
  private maxSeenTs = 0;
  constructor(private readonly maxOutOfOrderMs: number) {}

  update(eventTs: number): number {
    this.maxSeenTs = Math.max(this.maxSeenTs, eventTs);
    return this.maxSeenTs - this.maxOutOfOrderMs;
  }
}

Problem 2: Late events are silently dropped

Symptom: Audit totals don’t match because delayed mobile events never appear in aggregates.

Root cause: Allowed lateness is zero, so post-watermark events are discarded.

Fix — keep a bounded lateness window and route too-late records to a dead-letter stream:

function routeLateEvent(
  record: StreamRecord<string, any>,
  windowEndMs: number,
  watermarkMs: number,
  allowedLatenessMs: number
): 'on-time' | 'late-update' | 'drop' {
  if (record.timestamp <= watermarkMs - allowedLatenessMs) return 'drop';
  if (record.timestamp <= windowEndMs && record.timestamp < watermarkMs) return 'late-update';
  return 'on-time';
}

// usage
const route = routeLateEvent(record, windowEnd, currentWatermark, 5 * 60_000);
if (route === 'drop') deadLetterProducer.send('events.too-late', record);

Problem 3: Duplicates appear after consumer restarts

Symptom: Revenue counters jump whenever the stream app crashes and recovers.

Root cause: Offset commits happen separately from sink writes, creating at-least-once effects.

Fix — use idempotent sink writes with deterministic event IDs:

interface AggregatedRow {
  eventId: string;
  key: string;
  windowStart: number;
  count: number;
}

async function writeIdempotent(
  db: { upsertByEventId: (row: AggregatedRow) => Promise<void> },
  row: AggregatedRow
): Promise<void> {
  await db.upsertByEventId(row);
}

function makeEventId(record: StreamRecord<string, any>): string {
  return `${record.topic}:${record.partition}:${record.offset}`;
}

Problem 4: Stateful operators run out of memory

Symptom: JVM/Node memory climbs indefinitely in long-running jobs.

Root cause: Closed windows are never evicted; state store retains expired keys forever.

Fix — periodically purge state by watermark:

interface WindowAgg {
  windowEnd: number;
  count: number;
}

function evictClosedWindows(
  store: StateStore<WindowAgg>,
  watermarkMs: number,
  allowedLatenessMs: number
): void {
  for (const [key, agg] of store.all()) {
    if (agg.windowEnd + allowedLatenessMs <= watermarkMs) {
      store.delete(key);
    }
  }
}

Problem 5: Pipeline stalls under burst traffic

Symptom: Source lag grows rapidly and CPU usage drops in downstream operators.

Root cause: Unbounded in-memory queues absorb bursts until GC thrashes; no backpressure contract.

Fix — add bounded queues and await drain before reading more:

class BoundedQueue<T> {
  private items: T[] = [];
  constructor(private readonly maxSize: number) {}

  async push(item: T): Promise<void> {
    while (this.items.length >= this.maxSize) {
      await new Promise((r) => setTimeout(r, 5));
    }
    this.items.push(item);
  }

  pop(): T | undefined {
    return this.items.shift();
  }
}

Problem 6: Wrong results after failover recovery

Symptom: After restart, the app replays from the right offset but reconstructed state is stale.

Root cause: Offsets are committed before flushing local state/changelog, breaking checkpoint atomicity.

Fix — flush state first, then commit offsets in one checkpoint routine:

async function commitCheckpoint(
  stores: Array<StateStore<unknown>>,
  commitOffsets: () => Promise<void>
): Promise<void> {
  for (const store of stores) {
    await store.flush();
  }
  await commitOffsets();
}

Problem 7: One partition lags while others are idle

Symptom: End-to-end latency spikes for a few keys, even though cluster utilization looks low.

Root cause: Hot keys map to a single partition and overload one task.

Fix — shard hot keys with stable salting before partitioning:

function shardKey(key: string, shards: number): string {
  let hash = 0;
  for (let i = 0; i < key.length; i++) {
    hash = (hash * 31 + key.charCodeAt(i)) >>> 0;
  }
  const shard = hash % shards;
  return `${key}#${shard}`;
}

// producer key for partitioning
const partitionKey = shardKey(record.key, 8);

Key Takeaways

  1. Stream processing handles unbounded data in real-time: Unlike batch processing, stream engines process events as they arrive with millisecond latency.

  2. Event time ≠ processing time: Always use event-time for correct aggregations. Watermarks track progress and trigger window closures.

  3. Exactly-once requires coordination across the pipeline: Kafka achieves it with idempotent producers + transactions + read-committed consumers. Flink uses distributed snapshots (Chandy-Lamport algorithm).

  4. Windowing is the core abstraction for bounded aggregations: Tumbling (fixed, non-overlapping), hopping (fixed, overlapping), and session (dynamic, gap-based) cover most use cases.

  5. Watermarks balance completeness vs latency: Conservative watermarks wait longer for late events (more complete, higher latency). Aggressive watermarks fire sooner (lower latency, more drops).

  6. Backpressure prevents OOM in unbalanced pipelines: Push-back (Reactive Streams), bounded buffers, and auto-scaling are the primary strategies. Flink and Kafka Streams handle this automatically.

  7. State is the hardest part of stream processing: Stateful operators (aggregations, joins) need fault-tolerant state stores. Kafka Streams uses RocksDB + changelog topics. Flink uses checkpoints to S3/HDFS.

  8. Stream-table duality enables real-time enrichment: Materialize a stream into a table for lookups. Emit table changes as a stream for downstream. This replaces batch ETL joins.

  9. Kafka Streams is a library, not a cluster: It runs inside your application as a dependency — no separate infrastructure. Scaling = adding more application instances.

  10. Choose the right tool for complexity: Simple event routing → Pulsar Functions. Kafka-centric apps → Kafka Streams. Complex event processing with custom windowing → Flink. ML + batch + stream → Spark Structured Streaming.

What did you think?

© 2026 Vidhya Sagar Thakur. All rights reserved.