Building a Real-Time Frontend System at Scale
Building a Real-Time Frontend System at Scale
Beyond WebSockets: Engineering Reliable Bidirectional Communication
Real-time features—live notifications, collaborative editing, presence indicators, streaming updates—have become table stakes for modern applications. But scaling real-time systems introduces challenges that don't exist in request-response architectures: persistent connection management, backpressure handling, state reconciliation after disconnects, and maintaining consistency across distributed clients.
This article presents production-ready patterns for building real-time frontends that handle millions of concurrent connections while maintaining reliability and consistency.
Real-Time Architecture Overview
┌─────────────────────────────────────────────────────────────────────────────┐
│ Real-Time System Architecture │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Clients │
│ ─────── │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │Browser 1│ │Browser 2│ │Browser 3│ │Mobile │ │
│ │ │ │ │ │ │ │ │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │ │
│ └────────────┴─────┬──────┴────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Load Balancer (Layer 7) │ │
│ │ Sticky sessions by connection ID │ │
│ └──────────────────────────────┬──────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────┼─────────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ WS Gateway 1 │ │ WS Gateway 2 │ │ WS Gateway 3 │ │
│ │ ┌───────────┐ │ │ ┌───────────┐ │ │ ┌───────────┐ │ │
│ │ │Connections│ │ │ │Connections│ │ │ │Connections│ │ │
│ │ │ 50,000 │ │ │ │ 50,000 │ │ │ │ 50,000 │ │ │
│ │ └───────────┘ │ │ └───────────┘ │ │ └───────────┘ │ │
│ └────────┬────────┘ └────────┬────────┘ └────────┬────────┘ │
│ │ │ │ │
│ └─────────────────────┼─────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Redis Pub/Sub Cluster │ │
│ │ (Cross-gateway message routing + Presence) │ │
│ └──────────────────────────────┬──────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Application Services │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────────────────┐ │ │
│ │ │ Chat Service │ │Notifications │ │ Collaborative Editing │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Connection Management
Robust WebSocket Client
// src/realtime/connection.ts
type ConnectionState = 'disconnected' | 'connecting' | 'connected' | 'reconnecting';
interface ConnectionConfig {
url: string;
protocols?: string[];
reconnect: {
enabled: boolean;
maxAttempts: number;
baseDelay: number;
maxDelay: number;
jitterFactor: number;
};
heartbeat: {
interval: number;
timeout: number;
};
auth: () => Promise<{ token: string }>;
}
interface ConnectionEvents {
onStateChange: (state: ConnectionState) => void;
onMessage: (data: unknown) => void;
onError: (error: Error) => void;
}
class RealtimeConnection {
private ws: WebSocket | null = null;
private state: ConnectionState = 'disconnected';
private reconnectAttempts = 0;
private heartbeatTimer: number | null = null;
private heartbeatTimeout: number | null = null;
private reconnectTimer: number | null = null;
private messageQueue: Array<{ data: string; resolve: () => void; reject: (err: Error) => void }> = [];
private lastMessageId = 0;
private pendingAcks: Map<number, { resolve: () => void; reject: (err: Error) => void; timeout: number }> = new Map();
constructor(
private config: ConnectionConfig,
private events: ConnectionEvents
) {}
async connect(): Promise<void> {
if (this.state === 'connected' || this.state === 'connecting') {
return;
}
this.setState('connecting');
try {
const { token } = await this.config.auth();
const url = new URL(this.config.url);
url.searchParams.set('token', token);
this.ws = new WebSocket(url.toString(), this.config.protocols);
this.setupEventHandlers();
await this.waitForConnection();
this.setState('connected');
this.startHeartbeat();
this.flushMessageQueue();
this.reconnectAttempts = 0;
} catch (error) {
this.setState('disconnected');
throw error;
}
}
private setupEventHandlers() {
if (!this.ws) return;
this.ws.onmessage = (event) => {
this.handleMessage(event.data);
};
this.ws.onerror = (event) => {
this.events.onError(new Error('WebSocket error'));
};
this.ws.onclose = (event) => {
this.handleClose(event);
};
}
private waitForConnection(): Promise<void> {
return new Promise((resolve, reject) => {
if (!this.ws) {
reject(new Error('No WebSocket instance'));
return;
}
const timeout = setTimeout(() => {
reject(new Error('Connection timeout'));
}, 10000);
this.ws.onopen = () => {
clearTimeout(timeout);
resolve();
};
this.ws.onerror = () => {
clearTimeout(timeout);
reject(new Error('Connection failed'));
};
});
}
private handleMessage(data: string) {
// Reset heartbeat timeout on any message
this.resetHeartbeatTimeout();
try {
const message = JSON.parse(data);
// Handle protocol messages
if (message.type === 'pong') {
return; // Heartbeat response
}
if (message.type === 'ack') {
this.handleAck(message.id);
return;
}
// Pass to application
this.events.onMessage(message);
} catch (error) {
this.events.onError(new Error(`Failed to parse message: ${data}`));
}
}
private handleAck(messageId: number) {
const pending = this.pendingAcks.get(messageId);
if (pending) {
clearTimeout(pending.timeout);
pending.resolve();
this.pendingAcks.delete(messageId);
}
}
private handleClose(event: CloseEvent) {
this.stopHeartbeat();
this.ws = null;
// Clean up pending acks
for (const [id, pending] of this.pendingAcks) {
clearTimeout(pending.timeout);
pending.reject(new Error('Connection closed'));
}
this.pendingAcks.clear();
// Determine if we should reconnect
if (event.code === 1000 || event.code === 1001) {
// Normal closure, don't reconnect
this.setState('disconnected');
return;
}
if (event.code === 4001) {
// Auth error, don't reconnect
this.events.onError(new Error('Authentication failed'));
this.setState('disconnected');
return;
}
// Attempt reconnection
if (this.config.reconnect.enabled) {
this.scheduleReconnect();
} else {
this.setState('disconnected');
}
}
private scheduleReconnect() {
if (this.reconnectAttempts >= this.config.reconnect.maxAttempts) {
this.events.onError(new Error('Max reconnection attempts reached'));
this.setState('disconnected');
return;
}
this.setState('reconnecting');
// Exponential backoff with jitter
const baseDelay = this.config.reconnect.baseDelay;
const maxDelay = this.config.reconnect.maxDelay;
const jitter = this.config.reconnect.jitterFactor;
const exponentialDelay = Math.min(
baseDelay * Math.pow(2, this.reconnectAttempts),
maxDelay
);
const jitteredDelay = exponentialDelay * (1 + (Math.random() - 0.5) * jitter);
this.reconnectTimer = window.setTimeout(async () => {
this.reconnectAttempts++;
try {
await this.connect();
} catch (error) {
// connect() will trigger another reconnect attempt
}
}, jitteredDelay);
}
private startHeartbeat() {
this.heartbeatTimer = window.setInterval(() => {
this.sendPing();
}, this.config.heartbeat.interval);
}
private stopHeartbeat() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
if (this.heartbeatTimeout) {
clearTimeout(this.heartbeatTimeout);
this.heartbeatTimeout = null;
}
}
private sendPing() {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ type: 'ping', t: Date.now() }));
this.heartbeatTimeout = window.setTimeout(() => {
// No pong received, connection is dead
this.ws?.close(4000, 'Heartbeat timeout');
}, this.config.heartbeat.timeout);
}
}
private resetHeartbeatTimeout() {
if (this.heartbeatTimeout) {
clearTimeout(this.heartbeatTimeout);
this.heartbeatTimeout = null;
}
}
private setState(state: ConnectionState) {
if (this.state !== state) {
this.state = state;
this.events.onStateChange(state);
}
}
send(data: unknown, options: { ack?: boolean } = {}): Promise<void> {
const message = JSON.stringify(data);
if (this.state !== 'connected' || !this.ws) {
// Queue message for later delivery
return new Promise((resolve, reject) => {
this.messageQueue.push({ data: message, resolve, reject });
});
}
if (options.ack) {
return this.sendWithAck(message);
}
this.ws.send(message);
return Promise.resolve();
}
private sendWithAck(message: string): Promise<void> {
return new Promise((resolve, reject) => {
const messageId = ++this.lastMessageId;
const messageWithId = JSON.parse(message);
messageWithId._id = messageId;
const timeout = window.setTimeout(() => {
this.pendingAcks.delete(messageId);
reject(new Error('Ack timeout'));
}, 5000);
this.pendingAcks.set(messageId, { resolve, reject, timeout });
this.ws!.send(JSON.stringify(messageWithId));
});
}
private flushMessageQueue() {
while (this.messageQueue.length > 0 && this.state === 'connected') {
const { data, resolve, reject } = this.messageQueue.shift()!;
try {
this.ws?.send(data);
resolve();
} catch (error) {
reject(error as Error);
}
}
}
disconnect() {
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
}
this.stopHeartbeat();
if (this.ws) {
this.ws.close(1000, 'Client disconnect');
this.ws = null;
}
this.setState('disconnected');
}
getState(): ConnectionState {
return this.state;
}
}
export { RealtimeConnection, ConnectionState, ConnectionConfig };
Subscription and Channel Management
Channel-Based Messaging System
// src/realtime/channels.ts
import { RealtimeConnection } from './connection';
interface ChannelOptions {
presence?: boolean;
history?: number; // Number of messages to fetch on join
}
interface ChannelMember {
userId: string;
clientId: string;
data: Record<string, unknown>;
joinedAt: number;
}
interface ChannelEvents<T> {
onMessage: (message: T) => void;
onPresenceJoin?: (member: ChannelMember) => void;
onPresenceLeave?: (member: ChannelMember) => void;
onPresenceUpdate?: (member: ChannelMember) => void;
onError?: (error: Error) => void;
}
type ChannelState = 'unsubscribed' | 'subscribing' | 'subscribed' | 'unsubscribing';
class Channel<T = unknown> {
private state: ChannelState = 'unsubscribed';
private members: Map<string, ChannelMember> = new Map();
private messageBuffer: T[] = [];
private lastEventId: string | null = null;
constructor(
private name: string,
private connection: RealtimeConnection,
private options: ChannelOptions,
private events: ChannelEvents<T>
) {}
async subscribe(): Promise<void> {
if (this.state === 'subscribed' || this.state === 'subscribing') {
return;
}
this.state = 'subscribing';
try {
await this.connection.send({
type: 'subscribe',
channel: this.name,
options: {
presence: this.options.presence,
history: this.options.history,
lastEventId: this.lastEventId,
},
}, { ack: true });
this.state = 'subscribed';
} catch (error) {
this.state = 'unsubscribed';
throw error;
}
}
async unsubscribe(): Promise<void> {
if (this.state === 'unsubscribed') {
return;
}
this.state = 'unsubscribing';
try {
await this.connection.send({
type: 'unsubscribe',
channel: this.name,
}, { ack: true });
} finally {
this.state = 'unsubscribed';
this.members.clear();
}
}
async publish(data: T): Promise<void> {
if (this.state !== 'subscribed') {
throw new Error('Channel not subscribed');
}
await this.connection.send({
type: 'publish',
channel: this.name,
data,
}, { ack: true });
}
handleMessage(message: {
type: string;
data?: T;
eventId?: string;
member?: ChannelMember;
members?: ChannelMember[];
}) {
switch (message.type) {
case 'message':
if (message.eventId) {
this.lastEventId = message.eventId;
}
this.events.onMessage(message.data!);
break;
case 'history':
// Replay historical messages in order
const history = message.data as unknown as T[];
history.forEach(msg => this.events.onMessage(msg));
break;
case 'presence_join':
this.members.set(message.member!.clientId, message.member!);
this.events.onPresenceJoin?.(message.member!);
break;
case 'presence_leave':
this.members.delete(message.member!.clientId);
this.events.onPresenceLeave?.(message.member!);
break;
case 'presence_update':
this.members.set(message.member!.clientId, message.member!);
this.events.onPresenceUpdate?.(message.member!);
break;
case 'presence_sync':
// Full presence state sync (after reconnect)
this.members.clear();
message.members?.forEach(m => this.members.set(m.clientId, m));
break;
}
}
getMembers(): ChannelMember[] {
return Array.from(this.members.values());
}
async updatePresence(data: Record<string, unknown>): Promise<void> {
if (!this.options.presence) {
throw new Error('Presence not enabled for this channel');
}
await this.connection.send({
type: 'presence_update',
channel: this.name,
data,
}, { ack: true });
}
getState(): ChannelState {
return this.state;
}
}
// Channel manager for handling multiple subscriptions
class ChannelManager {
private channels: Map<string, Channel> = new Map();
constructor(private connection: RealtimeConnection) {
// Route incoming messages to channels
this.setupMessageRouting();
}
private setupMessageRouting() {
// This would be called by the connection's onMessage handler
}
subscribe<T>(
channelName: string,
events: ChannelEvents<T>,
options: ChannelOptions = {}
): Channel<T> {
const existingChannel = this.channels.get(channelName);
if (existingChannel) {
return existingChannel as Channel<T>;
}
const channel = new Channel<T>(channelName, this.connection, options, events);
this.channels.set(channelName, channel as Channel);
channel.subscribe().catch(error => {
events.onError?.(error);
});
return channel;
}
unsubscribe(channelName: string): void {
const channel = this.channels.get(channelName);
if (channel) {
channel.unsubscribe();
this.channels.delete(channelName);
}
}
routeMessage(channelName: string, message: unknown) {
const channel = this.channels.get(channelName);
if (channel) {
channel.handleMessage(message as any);
}
}
}
export { Channel, ChannelManager, ChannelOptions, ChannelMember };
Backpressure Handling
Client-Side Flow Control
┌─────────────────────────────────────────────────────────────────────────────┐
│ Backpressure Handling Strategy │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Message Flow with Backpressure │
│ ────────────────────────────── │
│ │
│ Server Client UI │
│ ────── ────── ── │
│ │ │ │ │
│ │ ──── 100 msg/s ────▶ │ │ │
│ │ │ │ │
│ │ ┌─────▼─────┐ │ │
│ │ │ Inbound │ │ │
│ │ │ Buffer │ │ │
│ │ │ (1000) │ │ │
│ │ └─────┬─────┘ │ │
│ │ │ │ │
│ │ ┌─────▼─────┐ │ │
│ │ │ Sampler │ │ │
│ │ │ & Merger │ │ │
│ │ └─────┬─────┘ │ │
│ │ │ │ │
│ │ ┌─────▼─────┐ │ │
│ │ │ Debounce │ │ │
│ │ │ (16ms) │ │ │
│ │ └─────┬─────┘ │ │
│ │ │ │ │
│ │ │ ──── 60 updates/s ────▶│ │
│ │ │ │ │
│ │ │
│ │ Buffer Full (backpressure signal) │
│ │ ◀───────────────────────│ │
│ │ │ │
│ │ Reduce rate / Batch │
│ │ ──── 50 msg/s ────▶ │ │
│ │ │ │
└─────────────────────────────────────────────────────────────────────────────┘
// src/realtime/backpressure.ts
interface BackpressureConfig {
maxBufferSize: number;
highWaterMark: number; // Start applying backpressure
lowWaterMark: number; // Resume normal processing
samplingStrategy: 'latest' | 'sample' | 'batch';
batchInterval: number;
onPressure: (level: 'none' | 'moderate' | 'high') => void;
}
type MessageHandler<T> = (messages: T[]) => void;
class BackpressureBuffer<T> {
private buffer: T[] = [];
private pressureLevel: 'none' | 'moderate' | 'high' = 'none';
private processingScheduled = false;
private lastProcessTime = 0;
constructor(
private config: BackpressureConfig,
private handler: MessageHandler<T>,
private keyExtractor?: (item: T) => string
) {}
push(item: T) {
// Check if we're at capacity
if (this.buffer.length >= this.config.maxBufferSize) {
this.applyBackpressure(item);
return;
}
this.buffer.push(item);
this.updatePressureLevel();
this.scheduleProcessing();
}
private applyBackpressure(item: T) {
switch (this.config.samplingStrategy) {
case 'latest':
// Keep only the latest message per key
if (this.keyExtractor) {
const key = this.keyExtractor(item);
const existingIndex = this.buffer.findIndex(
i => this.keyExtractor!(i) === key
);
if (existingIndex !== -1) {
this.buffer[existingIndex] = item;
} else {
// Drop oldest
this.buffer.shift();
this.buffer.push(item);
}
} else {
// No key, just replace oldest
this.buffer.shift();
this.buffer.push(item);
}
break;
case 'sample':
// Probabilistic sampling - keep ~50% under pressure
if (Math.random() < 0.5) {
this.buffer.shift();
this.buffer.push(item);
}
break;
case 'batch':
// Drop the oldest batch
const batchSize = Math.ceil(this.buffer.length * 0.1);
this.buffer.splice(0, batchSize);
this.buffer.push(item);
break;
}
}
private updatePressureLevel() {
const fillRatio = this.buffer.length / this.config.maxBufferSize;
let newLevel: 'none' | 'moderate' | 'high';
if (fillRatio >= this.config.highWaterMark) {
newLevel = 'high';
} else if (fillRatio >= this.config.lowWaterMark) {
newLevel = 'moderate';
} else {
newLevel = 'none';
}
if (newLevel !== this.pressureLevel) {
this.pressureLevel = newLevel;
this.config.onPressure(newLevel);
}
}
private scheduleProcessing() {
if (this.processingScheduled) return;
this.processingScheduled = true;
// Use requestAnimationFrame for UI updates
requestAnimationFrame(() => {
this.processingScheduled = false;
this.process();
});
}
private process() {
const now = performance.now();
// Rate limit processing
if (now - this.lastProcessTime < this.config.batchInterval) {
this.scheduleProcessing();
return;
}
if (this.buffer.length === 0) return;
// Determine batch size based on pressure
let batchSize: number;
switch (this.pressureLevel) {
case 'high':
batchSize = Math.min(this.buffer.length, 100);
break;
case 'moderate':
batchSize = Math.min(this.buffer.length, 50);
break;
default:
batchSize = Math.min(this.buffer.length, 20);
}
const batch = this.buffer.splice(0, batchSize);
this.lastProcessTime = now;
// Deduplicate batch if we have a key extractor
const processedBatch = this.keyExtractor
? this.deduplicateBatch(batch)
: batch;
this.handler(processedBatch);
this.updatePressureLevel();
// Continue processing if there's more
if (this.buffer.length > 0) {
this.scheduleProcessing();
}
}
private deduplicateBatch(batch: T[]): T[] {
const seen = new Map<string, T>();
// Keep the latest value for each key
for (const item of batch) {
const key = this.keyExtractor!(item);
seen.set(key, item);
}
return Array.from(seen.values());
}
getStats(): {
bufferSize: number;
pressureLevel: string;
fillRatio: number;
} {
return {
bufferSize: this.buffer.length,
pressureLevel: this.pressureLevel,
fillRatio: this.buffer.length / this.config.maxBufferSize,
};
}
clear() {
this.buffer = [];
this.updatePressureLevel();
}
}
export { BackpressureBuffer, BackpressureConfig };
Adaptive Rate Control
// src/realtime/rate-control.ts
interface RateControlConfig {
targetFrameRate: number; // e.g., 60fps
measurementWindow: number; // ms
minRate: number; // Minimum messages per second
maxRate: number; // Maximum messages per second
}
class AdaptiveRateController {
private frameTimings: number[] = [];
private currentRate: number;
private lastAdjustment = 0;
private isThrottling = false;
constructor(private config: RateControlConfig) {
this.currentRate = config.maxRate;
this.monitorFrameRate();
}
private monitorFrameRate() {
let lastFrameTime = performance.now();
const measureFrame = () => {
const now = performance.now();
const frameDuration = now - lastFrameTime;
lastFrameTime = now;
this.frameTimings.push(frameDuration);
// Keep only recent measurements
const cutoff = now - this.config.measurementWindow;
while (
this.frameTimings.length > 0 &&
now - this.frameTimings[0] > cutoff
) {
this.frameTimings.shift();
}
this.adjustRate();
requestAnimationFrame(measureFrame);
};
requestAnimationFrame(measureFrame);
}
private adjustRate() {
const now = performance.now();
// Only adjust every 500ms
if (now - this.lastAdjustment < 500) return;
this.lastAdjustment = now;
if (this.frameTimings.length < 10) return;
const avgFrameDuration =
this.frameTimings.reduce((a, b) => a + b, 0) / this.frameTimings.length;
const currentFPS = 1000 / avgFrameDuration;
const targetFPS = this.config.targetFrameRate;
if (currentFPS < targetFPS * 0.9) {
// Frame rate dropping, reduce message rate
this.currentRate = Math.max(
this.config.minRate,
this.currentRate * 0.8
);
this.isThrottling = true;
} else if (currentFPS > targetFPS * 0.95 && this.isThrottling) {
// Frame rate recovered, gradually increase
this.currentRate = Math.min(
this.config.maxRate,
this.currentRate * 1.1
);
if (this.currentRate >= this.config.maxRate) {
this.isThrottling = false;
}
}
}
shouldProcess(): boolean {
// Simple token bucket
const tokensPerMs = this.currentRate / 1000;
const now = performance.now();
// Probabilistic check based on current rate
return Math.random() < tokensPerMs;
}
getCurrentRate(): number {
return this.currentRate;
}
isUnderPressure(): boolean {
return this.isThrottling;
}
}
export { AdaptiveRateController };
State Reconciliation After Disconnects
Delta Sync Protocol
┌─────────────────────────────────────────────────────────────────────────────┐
│ State Reconciliation After Reconnect │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Disconnect Reconnect │
│ ────────── ───────── │
│ │ │ │
│ │ │ │
│ │ Client State: v5 │ │
│ │ Last Event ID: evt_abc123 │ │
│ │ │ │
│ ▼ │ │
│ ┌───────────────────────────────────────┼────────────────────────────────┐│
│ │ Offline Period │ ││
│ │ │ ││
│ │ Server State Changes: │ ││
│ │ - evt_abc124: User "alice" joined │ ││
│ │ - evt_abc125: Message "hello" │ ││
│ │ - evt_abc126: User "bob" left │ ││
│ │ - evt_abc127: Message "world" │ ││
│ │ │ ││
│ └───────────────────────────────────────┼────────────────────────────────┘│
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────────────────────────┐│
│ │ Reconnection Flow ││
│ │ ││
│ │ 1. Client sends: { type: "sync", lastEventId: "evt_abc123" } ││
│ │ ││
│ │ 2. Server checks: Do we have events since evt_abc123? ││
│ │ YES → Send delta (events abc124-abc127) ││
│ │ NO → Send full state snapshot ││
│ │ ││
│ │ 3. Client applies delta in order ││
│ │ ││
│ │ 4. Client state: v9, Last Event ID: evt_abc127 ││
│ │ ││
│ └────────────────────────────────────────────────────────────────────────┘│
│ │
└─────────────────────────────────────────────────────────────────────────────┘
// src/realtime/reconciliation.ts
interface StateEvent {
id: string;
type: string;
timestamp: number;
data: unknown;
version: number;
}
interface SyncResponse {
type: 'delta' | 'snapshot';
events?: StateEvent[];
state?: unknown;
latestEventId: string;
version: number;
}
interface ReconciliationConfig {
maxDeltaEvents: number; // Max events before forcing snapshot
conflictResolution: 'server-wins' | 'client-wins' | 'merge';
onConflict?: (serverState: unknown, clientState: unknown) => unknown;
}
class StateReconciler<T> {
private lastEventId: string | null = null;
private version = 0;
private pendingOperations: Array<{
id: string;
operation: unknown;
timestamp: number;
}> = [];
constructor(
private config: ReconciliationConfig,
private applyEvent: (state: T, event: StateEvent) => T,
private getState: () => T,
private setState: (state: T) => void
) {}
// Called when connection is established/re-established
async reconcile(connection: RealtimeConnection): Promise<void> {
const syncResponse = await this.requestSync(connection);
if (syncResponse.type === 'delta') {
this.applyDelta(syncResponse.events!);
} else {
this.applySnapshot(syncResponse.state as T);
}
this.lastEventId = syncResponse.latestEventId;
this.version = syncResponse.version;
// Replay any pending operations
await this.replayPendingOperations(connection);
}
private async requestSync(connection: RealtimeConnection): Promise<SyncResponse> {
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error('Sync timeout'));
}, 10000);
connection.send({
type: 'sync',
lastEventId: this.lastEventId,
version: this.version,
pendingOps: this.pendingOperations.map(op => op.id),
}, { ack: true }).then(() => {
// Response will come through message handler
// This is simplified - real implementation would use a response correlation
clearTimeout(timeout);
});
});
}
private applyDelta(events: StateEvent[]) {
let state = this.getState();
// Sort events by version
events.sort((a, b) => a.version - b.version);
for (const event of events) {
// Check if this event was from our pending operations
const pendingIndex = this.pendingOperations.findIndex(
op => op.id === event.id
);
if (pendingIndex !== -1) {
// Our operation was acknowledged, remove from pending
this.pendingOperations.splice(pendingIndex, 1);
}
state = this.applyEvent(state, event);
}
this.setState(state);
}
private applySnapshot(serverState: T) {
const clientState = this.getState();
let finalState: T;
switch (this.config.conflictResolution) {
case 'server-wins':
finalState = serverState;
break;
case 'client-wins':
// Start with server state but apply pending operations
finalState = serverState;
for (const pending of this.pendingOperations) {
finalState = this.applyEvent(finalState, {
id: pending.id,
type: 'pending',
timestamp: pending.timestamp,
data: pending.operation,
version: this.version,
});
}
break;
case 'merge':
if (this.config.onConflict) {
finalState = this.config.onConflict(serverState, clientState) as T;
} else {
finalState = serverState;
}
break;
}
this.setState(finalState);
}
private async replayPendingOperations(connection: RealtimeConnection) {
// Resend any operations that weren't acknowledged
for (const pending of this.pendingOperations) {
await connection.send({
type: 'operation',
id: pending.id,
data: pending.operation,
isRetry: true,
}, { ack: true });
}
}
// Called when user performs an action
async performOperation(
connection: RealtimeConnection,
operation: unknown
): Promise<void> {
const operationId = crypto.randomUUID();
const timestamp = Date.now();
// Track pending operation
this.pendingOperations.push({
id: operationId,
operation,
timestamp,
});
// Optimistically apply
const currentState = this.getState();
const newState = this.applyEvent(currentState, {
id: operationId,
type: 'optimistic',
timestamp,
data: operation,
version: this.version + 1,
});
this.setState(newState);
// Send to server
try {
await connection.send({
type: 'operation',
id: operationId,
data: operation,
}, { ack: true });
} catch (error) {
// Operation failed, will be retried on reconnect
// Don't remove from pending
console.error('Operation failed:', error);
}
}
// Handle incoming events from server
handleServerEvent(event: StateEvent) {
// Remove from pending if it's our operation being confirmed
const pendingIndex = this.pendingOperations.findIndex(
op => op.id === event.id
);
if (pendingIndex !== -1) {
this.pendingOperations.splice(pendingIndex, 1);
} else {
// This is a new event from another client
const state = this.getState();
const newState = this.applyEvent(state, event);
this.setState(newState);
}
this.lastEventId = event.id;
this.version = event.version;
}
getLastEventId(): string | null {
return this.lastEventId;
}
getPendingCount(): number {
return this.pendingOperations.length;
}
}
export { StateReconciler, StateEvent, SyncResponse };
Consistency Guarantees
Ordering and Delivery Semantics
┌─────────────────────────────────────────────────────────────────────────────┐
│ Message Ordering Guarantees │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Delivery Semantics │
│ ────────────────── │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ At-Most-Once At-Least-Once Exactly-Once │ │
│ │ ───────────── ────────────── ──────────── │ │
│ │ │ │
│ │ Fire and forget Retry until ack Dedup + Retry │ │
│ │ │ │
│ │ ┌───┐ ┌───┐ ┌───┐ ┌───┐ ┌───┐ ┌───┐ │ │
│ │ │ A │───▶│ B │ │ A │───▶│ B │ │ A │───▶│ B │ │ │
│ │ └───┘ └───┘ └───┘ └───┘ └───┘ └───┘ │ │
│ │ │ │ │ │
│ │ Lost? Too bad. ├──▶ Retry ├──▶ Retry │ │
│ │ ├──▶ Retry │ │ │
│ │ ▼ ▼ │ │
│ │ May get Dedup by │ │
│ │ duplicates message ID │ │
│ │ │ │
│ │ Use for: Use for: Use for: │ │
│ │ - Typing indicators - Chat messages - Financial txns │ │
│ │ - Mouse position - Notifications - Order placement │ │
│ │ - Heartbeats - State updates - Inventory updates │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ Ordering Guarantees │
│ ─────────────────── │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ No Ordering FIFO (per sender) Total Order │ │
│ │ ─────────── ───────────────── ─────────── │ │
│ │ │ │
│ │ Messages may Messages from same All clients see │ │
│ │ arrive out of sender arrive in same order │ │
│ │ order order │ │
│ │ │ │
│ │ A1 ──┐ A1 ──┐ A1 ──┐ │ │
│ │ A2 ──┼──▶ ? A2 ──┼──▶ A1,A2 A2 ──┼──▶ A1,B1,A2 │ │
│ │ B1 ──┘ B1 ──┘ B1 (any) B1 ──┘ (same for │ │
│ │ all) │ │
│ │ │ │
│ │ Implementation: Implementation: Implementation: │ │
│ │ - None needed - Sequence numbers - Central sequencer │ │
│ │ - Per-sender - Lamport clocks │ │
│ │ - Vector clocks │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
// src/realtime/ordering.ts
interface OrderedMessage<T> {
id: string;
senderId: string;
sequenceNumber: number;
globalSequence?: number;
timestamp: number;
data: T;
}
interface OrderingConfig {
type: 'none' | 'fifo' | 'total';
maxBufferSize: number;
maxWaitTime: number;
}
class MessageOrderer<T> {
// Per-sender sequence tracking
private expectedSequence: Map<string, number> = new Map();
// Buffer for out-of-order messages
private pendingMessages: Map<string, OrderedMessage<T>[]> = new Map();
// For total ordering
private globalSequence = 0;
private globalBuffer: OrderedMessage<T>[] = [];
constructor(
private config: OrderingConfig,
private onMessage: (message: T) => void
) {}
receive(message: OrderedMessage<T>) {
switch (this.config.type) {
case 'none':
this.onMessage(message.data);
break;
case 'fifo':
this.processFIFO(message);
break;
case 'total':
this.processTotalOrder(message);
break;
}
}
private processFIFO(message: OrderedMessage<T>) {
const { senderId, sequenceNumber } = message;
// Initialize expected sequence for new sender
if (!this.expectedSequence.has(senderId)) {
this.expectedSequence.set(senderId, sequenceNumber);
}
const expected = this.expectedSequence.get(senderId)!;
if (sequenceNumber === expected) {
// In order, deliver immediately
this.onMessage(message.data);
this.expectedSequence.set(senderId, expected + 1);
// Check if we can deliver buffered messages
this.deliverPending(senderId);
} else if (sequenceNumber > expected) {
// Out of order, buffer it
this.bufferMessage(senderId, message);
// Set timeout to deliver anyway (gap may be permanent)
setTimeout(() => {
this.deliverPendingUpTo(senderId, sequenceNumber);
}, this.config.maxWaitTime);
}
// sequenceNumber < expected means duplicate, ignore
}
private bufferMessage(senderId: string, message: OrderedMessage<T>) {
if (!this.pendingMessages.has(senderId)) {
this.pendingMessages.set(senderId, []);
}
const buffer = this.pendingMessages.get(senderId)!;
// Check buffer size limit
if (buffer.length >= this.config.maxBufferSize) {
// Drop oldest and force delivery
const oldest = buffer.shift()!;
this.onMessage(oldest.data);
}
// Insert in sorted order
const insertIndex = buffer.findIndex(
m => m.sequenceNumber > message.sequenceNumber
);
if (insertIndex === -1) {
buffer.push(message);
} else {
buffer.splice(insertIndex, 0, message);
}
}
private deliverPending(senderId: string) {
const buffer = this.pendingMessages.get(senderId);
if (!buffer || buffer.length === 0) return;
let expected = this.expectedSequence.get(senderId)!;
while (buffer.length > 0 && buffer[0].sequenceNumber === expected) {
const message = buffer.shift()!;
this.onMessage(message.data);
expected++;
}
this.expectedSequence.set(senderId, expected);
}
private deliverPendingUpTo(senderId: string, upToSequence: number) {
const buffer = this.pendingMessages.get(senderId);
if (!buffer) return;
let expected = this.expectedSequence.get(senderId)!;
// Deliver all messages up to the specified sequence
while (expected <= upToSequence) {
const message = buffer.find(m => m.sequenceNumber === expected);
if (message) {
const index = buffer.indexOf(message);
buffer.splice(index, 1);
this.onMessage(message.data);
}
// If message not in buffer, skip (lost message)
expected++;
}
this.expectedSequence.set(senderId, expected);
}
private processTotalOrder(message: OrderedMessage<T>) {
const globalSeq = message.globalSequence!;
if (globalSeq === this.globalSequence + 1) {
// In order
this.onMessage(message.data);
this.globalSequence++;
// Deliver buffered messages
this.deliverGlobalPending();
} else if (globalSeq > this.globalSequence + 1) {
// Out of order, buffer
this.globalBuffer.push(message);
this.globalBuffer.sort((a, b) => a.globalSequence! - b.globalSequence!);
// Limit buffer size
if (this.globalBuffer.length > this.config.maxBufferSize) {
// Force delivery
const next = this.globalBuffer.shift()!;
this.onMessage(next.data);
this.globalSequence = next.globalSequence!;
this.deliverGlobalPending();
}
}
// Duplicate if globalSeq <= this.globalSequence
}
private deliverGlobalPending() {
while (
this.globalBuffer.length > 0 &&
this.globalBuffer[0].globalSequence === this.globalSequence + 1
) {
const message = this.globalBuffer.shift()!;
this.onMessage(message.data);
this.globalSequence++;
}
}
reset() {
this.expectedSequence.clear();
this.pendingMessages.clear();
this.globalSequence = 0;
this.globalBuffer = [];
}
}
export { MessageOrderer, OrderedMessage, OrderingConfig };
React Integration
Real-Time Hooks
// src/realtime/hooks.ts
import { useEffect, useRef, useState, useCallback, useSyncExternalStore } from 'react';
import { RealtimeConnection, ConnectionState } from './connection';
import { Channel, ChannelMember } from './channels';
import { BackpressureBuffer } from './backpressure';
// Connection hook
function useRealtimeConnection(config: {
url: string;
auth: () => Promise<{ token: string }>;
}) {
const connectionRef = useRef<RealtimeConnection | null>(null);
const [state, setState] = useState<ConnectionState>('disconnected');
const [error, setError] = useState<Error | null>(null);
useEffect(() => {
const connection = new RealtimeConnection(
{
url: config.url,
auth: config.auth,
reconnect: {
enabled: true,
maxAttempts: 10,
baseDelay: 1000,
maxDelay: 30000,
jitterFactor: 0.3,
},
heartbeat: {
interval: 30000,
timeout: 10000,
},
},
{
onStateChange: setState,
onMessage: () => {}, // Will be overridden by channels
onError: setError,
}
);
connectionRef.current = connection;
connection.connect();
return () => {
connection.disconnect();
};
}, [config.url]);
return {
connection: connectionRef.current,
state,
error,
isConnected: state === 'connected',
};
}
// Channel subscription hook
function useChannel<T>(
connection: RealtimeConnection | null,
channelName: string,
options: {
presence?: boolean;
history?: number;
onMessage?: (message: T) => void;
bufferConfig?: {
enabled: boolean;
maxSize: number;
batchInterval: number;
};
} = {}
) {
const channelRef = useRef<Channel<T> | null>(null);
const [messages, setMessages] = useState<T[]>([]);
const [members, setMembers] = useState<ChannelMember[]>([]);
const [isSubscribed, setIsSubscribed] = useState(false);
const bufferRef = useRef<BackpressureBuffer<T> | null>(null);
useEffect(() => {
if (!connection || connection.getState() !== 'connected') {
return;
}
// Setup backpressure buffer if enabled
if (options.bufferConfig?.enabled) {
bufferRef.current = new BackpressureBuffer<T>(
{
maxBufferSize: options.bufferConfig.maxSize,
highWaterMark: 0.8,
lowWaterMark: 0.5,
samplingStrategy: 'latest',
batchInterval: options.bufferConfig.batchInterval,
onPressure: (level) => {
console.log(`Backpressure: ${level}`);
},
},
(batch) => {
setMessages(prev => [...prev, ...batch]);
batch.forEach(msg => options.onMessage?.(msg));
}
);
}
const channel = new Channel<T>(
channelName,
connection,
{
presence: options.presence,
history: options.history,
},
{
onMessage: (message) => {
if (bufferRef.current) {
bufferRef.current.push(message);
} else {
setMessages(prev => [...prev, message]);
options.onMessage?.(message);
}
},
onPresenceJoin: (member) => {
setMembers(prev => [...prev, member]);
},
onPresenceLeave: (member) => {
setMembers(prev => prev.filter(m => m.clientId !== member.clientId));
},
onPresenceUpdate: (member) => {
setMembers(prev =>
prev.map(m => m.clientId === member.clientId ? member : m)
);
},
}
);
channelRef.current = channel;
channel.subscribe()
.then(() => setIsSubscribed(true))
.catch(console.error);
return () => {
channel.unsubscribe();
bufferRef.current?.clear();
};
}, [connection, channelName, options.presence, options.history]);
const publish = useCallback(async (data: T) => {
if (channelRef.current) {
await channelRef.current.publish(data);
}
}, []);
const updatePresence = useCallback(async (data: Record<string, unknown>) => {
if (channelRef.current) {
await channelRef.current.updatePresence(data);
}
}, []);
return {
messages,
members,
isSubscribed,
publish,
updatePresence,
clearMessages: () => setMessages([]),
};
}
// Presence hook
function usePresence(
connection: RealtimeConnection | null,
channelName: string,
userData: Record<string, unknown>
) {
const [members, setMembers] = useState<ChannelMember[]>([]);
const [isJoined, setIsJoined] = useState(false);
useEffect(() => {
if (!connection || connection.getState() !== 'connected') {
return;
}
const channel = new Channel(
channelName,
connection,
{ presence: true },
{
onMessage: () => {},
onPresenceJoin: (member) => {
setMembers(prev => {
if (prev.some(m => m.clientId === member.clientId)) {
return prev;
}
return [...prev, member];
});
},
onPresenceLeave: (member) => {
setMembers(prev => prev.filter(m => m.clientId !== member.clientId));
},
onPresenceUpdate: (member) => {
setMembers(prev =>
prev.map(m => m.clientId === member.clientId ? member : m)
);
},
}
);
channel.subscribe()
.then(() => {
setIsJoined(true);
return channel.updatePresence(userData);
})
.catch(console.error);
return () => {
channel.unsubscribe();
};
}, [connection, channelName]);
// Update presence when userData changes
useEffect(() => {
if (isJoined && connection) {
// channelRef.current?.updatePresence(userData);
}
}, [userData, isJoined]);
return {
members,
isJoined,
myPresence: userData,
};
}
// Optimistic updates with real-time sync
function useOptimisticRealtime<T extends { id: string }>(
connection: RealtimeConnection | null,
channelName: string,
initialData: T[]
) {
const [data, setData] = useState<T[]>(initialData);
const [pendingOps, setPendingOps] = useState<Set<string>>(new Set());
useEffect(() => {
if (!connection || connection.getState() !== 'connected') {
return;
}
const channel = new Channel<{ type: string; item: T; opId: string }>(
channelName,
connection,
{},
{
onMessage: (message) => {
// Remove from pending when server confirms
setPendingOps(prev => {
const next = new Set(prev);
next.delete(message.opId);
return next;
});
// Apply server state (may differ from optimistic)
setData(prev => {
switch (message.type) {
case 'create':
// Replace optimistic with server version
const existingIndex = prev.findIndex(i => i.id === message.item.id);
if (existingIndex >= 0) {
return prev.map((item, i) =>
i === existingIndex ? message.item : item
);
}
return [...prev, message.item];
case 'update':
return prev.map(item =>
item.id === message.item.id ? message.item : item
);
case 'delete':
return prev.filter(item => item.id !== message.item.id);
default:
return prev;
}
});
},
}
);
channel.subscribe();
return () => {
channel.unsubscribe();
};
}, [connection, channelName]);
const optimisticCreate = useCallback(async (item: T) => {
const opId = crypto.randomUUID();
// Optimistic update
setData(prev => [...prev, item]);
setPendingOps(prev => new Set(prev).add(opId));
// Send to server
try {
await connection?.send({
type: 'publish',
channel: channelName,
data: { type: 'create', item, opId },
}, { ack: true });
} catch (error) {
// Rollback on failure
setData(prev => prev.filter(i => i.id !== item.id));
setPendingOps(prev => {
const next = new Set(prev);
next.delete(opId);
return next;
});
throw error;
}
}, [connection, channelName]);
const optimisticUpdate = useCallback(async (item: T) => {
const opId = crypto.randomUUID();
const previousItem = data.find(i => i.id === item.id);
// Optimistic update
setData(prev => prev.map(i => i.id === item.id ? item : i));
setPendingOps(prev => new Set(prev).add(opId));
try {
await connection?.send({
type: 'publish',
channel: channelName,
data: { type: 'update', item, opId },
}, { ack: true });
} catch (error) {
// Rollback
if (previousItem) {
setData(prev => prev.map(i => i.id === item.id ? previousItem : i));
}
setPendingOps(prev => {
const next = new Set(prev);
next.delete(opId);
return next;
});
throw error;
}
}, [connection, channelName, data]);
const optimisticDelete = useCallback(async (itemId: string) => {
const opId = crypto.randomUUID();
const previousItem = data.find(i => i.id === itemId);
// Optimistic delete
setData(prev => prev.filter(i => i.id !== itemId));
setPendingOps(prev => new Set(prev).add(opId));
try {
await connection?.send({
type: 'publish',
channel: channelName,
data: { type: 'delete', item: { id: itemId }, opId },
}, { ack: true });
} catch (error) {
// Rollback
if (previousItem) {
setData(prev => [...prev, previousItem]);
}
setPendingOps(prev => {
const next = new Set(prev);
next.delete(opId);
return next;
});
throw error;
}
}, [connection, channelName, data]);
return {
data,
pendingOps: pendingOps.size,
hasPendingOps: pendingOps.size > 0,
create: optimisticCreate,
update: optimisticUpdate,
delete: optimisticDelete,
};
}
export {
useRealtimeConnection,
useChannel,
usePresence,
useOptimisticRealtime,
};
Server-Side Gateway Implementation
WebSocket Gateway with Redis Pub/Sub
// server/gateway.ts
import { WebSocketServer, WebSocket } from 'ws';
import { createClient } from 'redis';
import { v4 as uuid } from 'uuid';
interface Client {
id: string;
userId: string;
ws: WebSocket;
channels: Set<string>;
lastPing: number;
}
interface GatewayConfig {
port: number;
redisUrl: string;
heartbeatInterval: number;
heartbeatTimeout: number;
}
class WebSocketGateway {
private wss: WebSocketServer;
private clients: Map<string, Client> = new Map();
private channelSubscribers: Map<string, Set<string>> = new Map();
private redisPub: ReturnType<typeof createClient>;
private redisSub: ReturnType<typeof createClient>;
private gatewayId: string;
constructor(private config: GatewayConfig) {
this.gatewayId = uuid();
this.wss = new WebSocketServer({ port: config.port });
this.redisPub = createClient({ url: config.redisUrl });
this.redisSub = createClient({ url: config.redisUrl });
this.initialize();
}
private async initialize() {
await this.redisPub.connect();
await this.redisSub.connect();
// Subscribe to cross-gateway messages
await this.redisSub.pSubscribe('channel:*', (message, channel) => {
this.handleRedisMessage(channel, message);
});
this.wss.on('connection', (ws, req) => {
this.handleConnection(ws, req);
});
// Heartbeat checker
setInterval(() => {
this.checkHeartbeats();
}, this.config.heartbeatInterval);
console.log(`Gateway ${this.gatewayId} started on port ${this.config.port}`);
}
private handleConnection(ws: WebSocket, req: any) {
const token = new URL(req.url, 'http://localhost').searchParams.get('token');
// Validate token (simplified)
const userId = this.validateToken(token);
if (!userId) {
ws.close(4001, 'Authentication failed');
return;
}
const clientId = uuid();
const client: Client = {
id: clientId,
userId,
ws,
channels: new Set(),
lastPing: Date.now(),
};
this.clients.set(clientId, client);
ws.on('message', (data) => {
this.handleMessage(client, data.toString());
});
ws.on('close', () => {
this.handleDisconnect(client);
});
ws.on('pong', () => {
client.lastPing = Date.now();
});
// Send connection acknowledgment
this.send(client, {
type: 'connected',
clientId,
});
}
private handleMessage(client: Client, data: string) {
try {
const message = JSON.parse(data);
switch (message.type) {
case 'ping':
client.lastPing = Date.now();
this.send(client, { type: 'pong', t: message.t });
break;
case 'subscribe':
this.handleSubscribe(client, message);
break;
case 'unsubscribe':
this.handleUnsubscribe(client, message);
break;
case 'publish':
this.handlePublish(client, message);
break;
case 'presence_update':
this.handlePresenceUpdate(client, message);
break;
case 'sync':
this.handleSync(client, message);
break;
default:
console.warn('Unknown message type:', message.type);
}
// Send ack if requested
if (message._id) {
this.send(client, { type: 'ack', id: message._id });
}
} catch (error) {
console.error('Failed to handle message:', error);
}
}
private async handleSubscribe(client: Client, message: any) {
const { channel, options } = message;
client.channels.add(channel);
// Track subscriber
if (!this.channelSubscribers.has(channel)) {
this.channelSubscribers.set(channel, new Set());
}
this.channelSubscribers.get(channel)!.add(client.id);
// Send history if requested
if (options?.history && options.lastEventId) {
const history = await this.getChannelHistory(
channel,
options.lastEventId,
options.history
);
this.send(client, {
type: 'history',
channel,
data: history,
});
}
// Handle presence
if (options?.presence) {
await this.handlePresenceJoin(client, channel);
}
}
private handleUnsubscribe(client: Client, message: any) {
const { channel } = message;
client.channels.delete(channel);
const subscribers = this.channelSubscribers.get(channel);
if (subscribers) {
subscribers.delete(client.id);
if (subscribers.size === 0) {
this.channelSubscribers.delete(channel);
}
}
// Handle presence leave
this.handlePresenceLeave(client, channel);
}
private async handlePublish(client: Client, message: any) {
const { channel, data } = message;
const eventId = uuid();
const event = {
id: eventId,
type: 'message',
channel,
data,
senderId: client.userId,
timestamp: Date.now(),
};
// Store in history
await this.storeEvent(channel, event);
// Publish to Redis for cross-gateway distribution
await this.redisPub.publish(
`channel:${channel}`,
JSON.stringify(event)
);
}
private handleRedisMessage(redisChannel: string, message: string) {
const channelName = redisChannel.replace('channel:', '');
const event = JSON.parse(message);
// Distribute to local subscribers
const subscribers = this.channelSubscribers.get(channelName);
if (!subscribers) return;
for (const clientId of subscribers) {
const client = this.clients.get(clientId);
if (client) {
this.send(client, event);
}
}
}
private async handlePresenceJoin(client: Client, channel: string) {
const member = {
userId: client.userId,
clientId: client.id,
data: {},
joinedAt: Date.now(),
};
// Store presence in Redis
await this.redisPub.hSet(
`presence:${channel}`,
client.id,
JSON.stringify(member)
);
// Publish join event
await this.redisPub.publish(
`channel:${channel}`,
JSON.stringify({
type: 'presence_join',
channel,
member,
})
);
// Send current presence state to joining client
const members = await this.getChannelMembers(channel);
this.send(client, {
type: 'presence_sync',
channel,
members,
});
}
private async handlePresenceLeave(client: Client, channel: string) {
await this.redisPub.hDel(`presence:${channel}`, client.id);
await this.redisPub.publish(
`channel:${channel}`,
JSON.stringify({
type: 'presence_leave',
channel,
member: {
userId: client.userId,
clientId: client.id,
},
})
);
}
private async handlePresenceUpdate(client: Client, message: any) {
const { channel, data } = message;
const member = {
userId: client.userId,
clientId: client.id,
data,
joinedAt: Date.now(),
};
await this.redisPub.hSet(
`presence:${channel}`,
client.id,
JSON.stringify(member)
);
await this.redisPub.publish(
`channel:${channel}`,
JSON.stringify({
type: 'presence_update',
channel,
member,
})
);
}
private async handleSync(client: Client, message: any) {
const { lastEventId, version } = message;
// Determine if we can do delta sync
const events = await this.getEventsSince(lastEventId);
if (events.length > 0 && events.length < 100) {
// Delta sync
this.send(client, {
type: 'sync_response',
syncType: 'delta',
events,
latestEventId: events[events.length - 1].id,
version: version + events.length,
});
} else {
// Full snapshot needed
const state = await this.getFullState(client.channels);
this.send(client, {
type: 'sync_response',
syncType: 'snapshot',
state,
latestEventId: await this.getLatestEventId(),
version: await this.getCurrentVersion(),
});
}
}
private handleDisconnect(client: Client) {
// Leave all channels
for (const channel of client.channels) {
this.handlePresenceLeave(client, channel);
const subscribers = this.channelSubscribers.get(channel);
if (subscribers) {
subscribers.delete(client.id);
}
}
this.clients.delete(client.id);
}
private checkHeartbeats() {
const now = Date.now();
const timeout = this.config.heartbeatTimeout;
for (const [id, client] of this.clients) {
if (now - client.lastPing > timeout) {
console.log(`Client ${id} timed out`);
client.ws.terminate();
this.handleDisconnect(client);
} else {
client.ws.ping();
}
}
}
private send(client: Client, data: unknown) {
if (client.ws.readyState === WebSocket.OPEN) {
client.ws.send(JSON.stringify(data));
}
}
private validateToken(token: string | null): string | null {
// Implement JWT validation
if (!token) return null;
// Return userId from token
return 'user_123'; // Placeholder
}
private async storeEvent(channel: string, event: any): Promise<void> {
// Store in Redis stream for history
await this.redisPub.xAdd(
`events:${channel}`,
'*',
{ data: JSON.stringify(event) }
);
// Trim to keep only recent events
await this.redisPub.xTrim(`events:${channel}`, 'MAXLEN', 1000);
}
private async getChannelHistory(
channel: string,
lastEventId: string,
limit: number
): Promise<any[]> {
const events = await this.redisPub.xRange(
`events:${channel}`,
lastEventId,
'+',
{ COUNT: limit }
);
return events.map(e => JSON.parse(e.message.data));
}
private async getChannelMembers(channel: string): Promise<any[]> {
const members = await this.redisPub.hGetAll(`presence:${channel}`);
return Object.values(members).map(m => JSON.parse(m));
}
private async getEventsSince(lastEventId: string): Promise<any[]> {
// Implementation depends on event storage strategy
return [];
}
private async getFullState(channels: Set<string>): Promise<any> {
// Implementation depends on state storage
return {};
}
private async getLatestEventId(): Promise<string> {
return uuid();
}
private async getCurrentVersion(): Promise<number> {
return 1;
}
}
export { WebSocketGateway };
Key Takeaways
-
Connection resilience is non-negotiable: Implement exponential backoff with jitter, heartbeat monitoring, and automatic reconnection with state sync
-
Backpressure protects the client: Buffer, sample, or drop messages when the client can't keep up—never let the UI freeze
-
State reconciliation handles the offline gap: Track last event IDs and support both delta sync and full snapshots
-
Choose appropriate delivery guarantees: At-most-once for ephemeral data, at-least-once for important messages, exactly-once only when necessary
-
Ordering has costs: FIFO per-sender is usually sufficient; total ordering requires coordination and adds latency
-
Presence requires careful cleanup: Handle disconnects, timeouts, and multi-device scenarios
-
Redis Pub/Sub enables horizontal scaling: Multiple gateway instances can share state through Redis
-
Optimistic updates improve perceived performance: Apply changes immediately, reconcile when server confirms
-
React hooks should handle connection lifecycle: Automatic subscription management, cleanup, and reconnection
-
Monitor everything: Connection states, message rates, backpressure levels, and reconciliation frequency
Real-time systems require thinking about failure modes that don't exist in request-response architectures. Design for disconnection, not just connection.
What did you think?