System Design & Architecture
Part 1 of 9Event-Driven Architecture Patterns in Node.js — Practical Guide
Event-Driven Architecture Patterns in Node.js — Practical Guide
Introduction
Event-driven architecture isn't just a buzzword—it's how Node.js was designed to work. The event loop, non-blocking I/O, and the EventEmitter class are all built around the idea that things happen asynchronously, and your code should respond to events rather than wait for operations to complete.
But there's a gap between "Node.js uses events internally" and "building event-driven systems at scale." The patterns that work for handling HTTP requests don't automatically scale to distributed systems with message queues, event sourcing, and eventual consistency.
This guide bridges that gap. We'll start with Node.js's built-in event capabilities, then progress through increasingly sophisticated patterns: pub/sub, message queues, event sourcing, and CQRS. Each pattern includes practical, production-ready code that you can adapt for your own systems.
Event-Driven Fundamentals
What Event-Driven Architecture Is
EVENT-DRIVEN ARCHITECTURE:
════════════════════════════════════════════════════════════════════
TRADITIONAL (Request-Response):
───────────────────────────────
┌─────────┐ request ┌─────────┐ call ┌─────────┐
│ Client │──────────────►│Service A│───────────►│Service B│
│ │◄──────────────│ │◄───────────│ │
└─────────┘ response └─────────┘ return └─────────┘
• Synchronous
• Tightly coupled (A must know about B)
• A waits for B to respond
• If B is down, A fails
EVENT-DRIVEN (Publish-Subscribe):
─────────────────────────────────
┌─────────┐ ┌─────────┐
│Service A│───event──┐ ┌───►│Service B│
│(Producer)│ │ │ │(Consumer)│
└─────────┘ │ │ └─────────┘
▼ │
┌────────────────┴───┐
│ Event Bus │
│ (Queue/Broker) │
└────────────────┬───┘
│ │
│ │ ┌─────────┐
│ └───►│Service C│
│ │(Consumer)│
│ └─────────┘
│
│ ┌─────────┐
└─────────────►│Service D│
│(Consumer)│
└─────────┘
• Asynchronous
• Loosely coupled (A doesn't know who consumes)
• A publishes and moves on
• If B is down, events queue up
THE KEY INSIGHT:
────────────────
Events represent facts that have happened.
"UserRegistered", not "RegisterUser"
"OrderPlaced", not "PlaceOrder"
Producers don't tell consumers what to do.
They announce what happened. Consumers decide how to react.
Why Node.js for Event-Driven Systems
NODE.JS EVENT-DRIVEN STRENGTHS:
════════════════════════════════════════════════════════════════════
1. BUILT-IN EVENT LOOP
────────────────────
Node.js is event-driven at its core.
Non-blocking I/O = naturally async.
// This doesn't block
fs.readFile('data.json', (err, data) => {
// Handle when ready
});
// Execution continues immediately
2. NATIVE EVENT EMITTER
─────────────────────
EventEmitter is in the standard library.
Familiar pattern for Node developers.
const events = new EventEmitter();
events.on('order.placed', handleOrder);
events.emit('order.placed', orderData);
3. EXCELLENT ASYNC SUPPORT
────────────────────────
async/await, Promises, streams.
Modern patterns for handling async events.
4. RICH ECOSYSTEM
───────────────
Libraries for every message broker:
• kafkajs (Kafka)
• amqplib (RabbitMQ)
• ioredis (Redis Pub/Sub)
• @aws-sdk (SQS, SNS, EventBridge)
• bullmq (Redis-based queues)
5. JSON-NATIVE
────────────
Events are typically JSON.
JavaScript handles JSON natively.
No serialization friction.
6. LIGHTWEIGHT PROCESSES
──────────────────────
Node.js has low memory footprint.
Easy to run many consumers.
Scales horizontally well.
Pattern 1: In-Process Events (EventEmitter)
Basic EventEmitter
// IN-PROCESS EVENT-DRIVEN ARCHITECTURE:
// ═══════════════════════════════════════════════════════════════
import { EventEmitter } from 'events';
// ─────────────────────────────────────────────────────────────────
// BASIC USAGE
// ─────────────────────────────────────────────────────────────────
const emitter = new EventEmitter();
// Subscribe to event
emitter.on('user.registered', (user) => {
console.log(`Welcome email sent to ${user.email}`);
});
emitter.on('user.registered', (user) => {
console.log(`Analytics tracked for ${user.id}`);
});
// Publish event
emitter.emit('user.registered', {
id: '123',
email: 'user@example.com',
name: 'John Doe',
});
// Output:
// Welcome email sent to user@example.com
// Analytics tracked for 123
// ─────────────────────────────────────────────────────────────────
// TYPED EVENT EMITTER (TypeScript)
// ─────────────────────────────────────────────────────────────────
import { EventEmitter } from 'events';
// Define event types
interface AppEvents {
'user.registered': [user: User];
'user.updated': [user: User, changes: Partial<User>];
'order.placed': [order: Order];
'order.shipped': [order: Order, tracking: string];
'payment.received': [payment: Payment];
'payment.failed': [payment: Payment, reason: string];
}
// Typed emitter class
class TypedEventEmitter<T extends Record<string, any[]>> {
private emitter = new EventEmitter();
on<K extends keyof T>(event: K, listener: (...args: T[K]) => void): this {
this.emitter.on(event as string, listener as any);
return this;
}
emit<K extends keyof T>(event: K, ...args: T[K]): boolean {
return this.emitter.emit(event as string, ...args);
}
off<K extends keyof T>(event: K, listener: (...args: T[K]) => void): this {
this.emitter.off(event as string, listener as any);
return this;
}
once<K extends keyof T>(event: K, listener: (...args: T[K]) => void): this {
this.emitter.once(event as string, listener as any);
return this;
}
}
// Usage with full type safety
const events = new TypedEventEmitter<AppEvents>();
events.on('user.registered', (user) => {
// user is typed as User
console.log(user.email);
});
events.emit('user.registered', { id: '1', email: 'test@example.com', name: 'Test' });
// Type error: Argument of type 'string' is not assignable to parameter of type 'User'
// events.emit('user.registered', 'invalid');
Application Event Bus
// APPLICATION EVENT BUS PATTERN:
// ═══════════════════════════════════════════════════════════════
// ─────────────────────────────────────────────────────────────────
// EVENT DEFINITIONS
// ─────────────────────────────────────────────────────────────────
// events/types.ts
export interface DomainEvent<T = unknown> {
id: string;
type: string;
timestamp: Date;
payload: T;
metadata: {
correlationId: string;
causationId?: string;
userId?: string;
};
}
export interface UserRegisteredEvent extends DomainEvent<{
userId: string;
email: string;
name: string;
}> {
type: 'user.registered';
}
export interface OrderPlacedEvent extends DomainEvent<{
orderId: string;
userId: string;
items: Array<{ productId: string; quantity: number; price: number }>;
total: number;
}> {
type: 'order.placed';
}
export type AppEvent = UserRegisteredEvent | OrderPlacedEvent;
// ─────────────────────────────────────────────────────────────────
// EVENT BUS IMPLEMENTATION
// ─────────────────────────────────────────────────────────────────
// events/event-bus.ts
import { EventEmitter } from 'events';
import { randomUUID } from 'crypto';
type EventHandler<T extends DomainEvent> = (event: T) => Promise<void>;
class EventBus {
private emitter = new EventEmitter();
private handlers = new Map<string, Set<EventHandler<any>>>();
constructor() {
// Increase listener limit for production use
this.emitter.setMaxListeners(100);
}
// Subscribe to events
subscribe<T extends DomainEvent>(
eventType: T['type'],
handler: EventHandler<T>
): () => void {
if (!this.handlers.has(eventType)) {
this.handlers.set(eventType, new Set());
}
this.handlers.get(eventType)!.add(handler);
const wrappedHandler = async (event: T) => {
try {
await handler(event);
} catch (error) {
console.error(`Error handling ${eventType}:`, error);
// In production: send to error tracking, maybe retry
}
};
this.emitter.on(eventType, wrappedHandler);
// Return unsubscribe function
return () => {
this.handlers.get(eventType)?.delete(handler);
this.emitter.off(eventType, wrappedHandler);
};
}
// Publish events
async publish<T extends DomainEvent>(event: Omit<T, 'id' | 'timestamp'>): Promise<void> {
const fullEvent: DomainEvent = {
...event,
id: randomUUID(),
timestamp: new Date(),
} as T;
console.log(`Publishing event: ${event.type}`, { eventId: fullEvent.id });
// Emit asynchronously to not block the publisher
setImmediate(() => {
this.emitter.emit(event.type, fullEvent);
});
}
// Publish and wait for all handlers to complete
async publishAndWait<T extends DomainEvent>(
event: Omit<T, 'id' | 'timestamp'>
): Promise<void> {
const fullEvent: DomainEvent = {
...event,
id: randomUUID(),
timestamp: new Date(),
} as T;
const handlers = this.handlers.get(event.type);
if (!handlers || handlers.size === 0) {
return;
}
await Promise.all(
Array.from(handlers).map(handler => handler(fullEvent))
);
}
}
// Singleton instance
export const eventBus = new EventBus();
// ─────────────────────────────────────────────────────────────────
// EVENT HANDLERS (Subscribers)
// ─────────────────────────────────────────────────────────────────
// handlers/email-handler.ts
import { eventBus } from '../events/event-bus';
import { UserRegisteredEvent, OrderPlacedEvent } from '../events/types';
import { emailService } from '../services/email';
// Subscribe to user registration
eventBus.subscribe<UserRegisteredEvent>('user.registered', async (event) => {
await emailService.sendWelcomeEmail({
to: event.payload.email,
name: event.payload.name,
});
console.log(`Welcome email sent for event ${event.id}`);
});
// Subscribe to order placed
eventBus.subscribe<OrderPlacedEvent>('order.placed', async (event) => {
await emailService.sendOrderConfirmation({
orderId: event.payload.orderId,
userId: event.payload.userId,
total: event.payload.total,
});
});
// handlers/analytics-handler.ts
import { eventBus } from '../events/event-bus';
import { UserRegisteredEvent, OrderPlacedEvent } from '../events/types';
import { analytics } from '../services/analytics';
eventBus.subscribe<UserRegisteredEvent>('user.registered', async (event) => {
await analytics.track('User Registered', {
userId: event.payload.userId,
timestamp: event.timestamp,
});
});
eventBus.subscribe<OrderPlacedEvent>('order.placed', async (event) => {
await analytics.track('Order Placed', {
orderId: event.payload.orderId,
userId: event.payload.userId,
total: event.payload.total,
itemCount: event.payload.items.length,
});
});
// ─────────────────────────────────────────────────────────────────
// USAGE IN APPLICATION CODE
// ─────────────────────────────────────────────────────────────────
// services/user-service.ts
import { eventBus } from '../events/event-bus';
import { UserRegisteredEvent } from '../events/types';
class UserService {
async registerUser(input: RegisterUserInput): Promise<User> {
// Create user in database
const user = await this.userRepository.create({
email: input.email,
name: input.name,
passwordHash: await hash(input.password),
});
// Publish event - handlers react asynchronously
await eventBus.publish<UserRegisteredEvent>({
type: 'user.registered',
payload: {
userId: user.id,
email: user.email,
name: user.name,
},
metadata: {
correlationId: input.correlationId || randomUUID(),
userId: user.id,
},
});
// Return immediately - email sending etc. happens async
return user;
}
}
When to Use In-Process Events
IN-PROCESS EVENTS - USE CASES:
════════════════════════════════════════════════════════════════════
✓ GOOD FOR:
───────────
• Decoupling within a single application
• Side effects that don't need guaranteed delivery
- Logging
- Analytics tracking
- Cache invalidation
• Fire-and-forget operations
• Breaking up large functions
• Plugin/extension systems
✗ NOT FOR:
──────────
• Cross-service communication (events lost if process dies)
• Operations that MUST happen (use queues with persistence)
• Distributed systems
• Anything requiring delivery guarantees
THE LIMITATION:
───────────────
┌─────────────────────────────────────────────────────────────────┐
│ │
│ Process Memory │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ Publish ──► EventEmitter ──► Handler │ │
│ │ │ │
│ │ Events exist only in memory. │ │
│ │ If process crashes, pending events are lost. │ │
│ │ No replay capability. │ │
│ │ No delivery guarantee. │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ For durability, you need external message infrastructure. │
│ │
└─────────────────────────────────────────────────────────────────┘
Pattern 2: Message Queues
Redis Pub/Sub
// REDIS PUB/SUB PATTERN:
// ═══════════════════════════════════════════════════════════════
import Redis from 'ioredis';
// ─────────────────────────────────────────────────────────────────
// BASIC PUB/SUB
// ─────────────────────────────────────────────────────────────────
// Publisher
const publisher = new Redis();
async function publishEvent(channel: string, event: object) {
await publisher.publish(channel, JSON.stringify(event));
}
// Subscriber (separate connection required for Redis pub/sub)
const subscriber = new Redis();
subscriber.subscribe('user-events', 'order-events', (err, count) => {
console.log(`Subscribed to ${count} channels`);
});
subscriber.on('message', (channel, message) => {
const event = JSON.parse(message);
console.log(`Received on ${channel}:`, event);
});
// ─────────────────────────────────────────────────────────────────
// STRUCTURED EVENT PUBLISHER
// ─────────────────────────────────────────────────────────────────
// publishers/redis-publisher.ts
import Redis from 'ioredis';
import { DomainEvent } from '../events/types';
class RedisEventPublisher {
private redis: Redis;
constructor(redisUrl: string) {
this.redis = new Redis(redisUrl);
}
async publish<T extends DomainEvent>(event: T): Promise<void> {
const channel = this.getChannel(event.type);
await this.redis.publish(channel, JSON.stringify(event));
// Also store in a stream for replay capability (optional)
await this.redis.xadd(
`events:${event.type}`,
'*',
'event', JSON.stringify(event)
);
}
private getChannel(eventType: string): string {
// user.registered -> user-events
const [domain] = eventType.split('.');
return `${domain}-events`;
}
async close(): Promise<void> {
await this.redis.quit();
}
}
export const eventPublisher = new RedisEventPublisher(process.env.REDIS_URL!);
// ─────────────────────────────────────────────────────────────────
// STRUCTURED EVENT SUBSCRIBER
// ─────────────────────────────────────────────────────────────────
// subscribers/redis-subscriber.ts
import Redis from 'ioredis';
import { DomainEvent } from '../events/types';
type EventHandler<T extends DomainEvent = DomainEvent> = (event: T) => Promise<void>;
class RedisEventSubscriber {
private subscriber: Redis;
private handlers = new Map<string, EventHandler[]>();
constructor(redisUrl: string) {
this.subscriber = new Redis(redisUrl);
this.setupMessageHandler();
}
private setupMessageHandler() {
this.subscriber.on('message', async (channel, message) => {
try {
const event: DomainEvent = JSON.parse(message);
await this.handleEvent(event);
} catch (error) {
console.error('Error processing event:', error);
}
});
}
private async handleEvent(event: DomainEvent) {
const handlers = this.handlers.get(event.type) || [];
await Promise.all(
handlers.map(async (handler) => {
try {
await handler(event);
} catch (error) {
console.error(`Handler error for ${event.type}:`, error);
// In production: retry logic, dead letter queue
}
})
);
}
async subscribe(eventType: string, handler: EventHandler): Promise<void> {
const channel = this.getChannel(eventType);
if (!this.handlers.has(eventType)) {
this.handlers.set(eventType, []);
await this.subscriber.subscribe(channel);
}
this.handlers.get(eventType)!.push(handler);
}
private getChannel(eventType: string): string {
const [domain] = eventType.split('.');
return `${domain}-events`;
}
async close(): Promise<void> {
await this.subscriber.quit();
}
}
// ─────────────────────────────────────────────────────────────────
// USAGE
// ─────────────────────────────────────────────────────────────────
// Service A: Publishing
import { eventPublisher } from './publishers/redis-publisher';
async function registerUser(input: RegisterUserInput) {
const user = await db.users.create(input);
await eventPublisher.publish({
id: randomUUID(),
type: 'user.registered',
timestamp: new Date(),
payload: { userId: user.id, email: user.email, name: user.name },
metadata: { correlationId: randomUUID() },
});
return user;
}
// Service B: Subscribing
import { RedisEventSubscriber } from './subscribers/redis-subscriber';
const subscriber = new RedisEventSubscriber(process.env.REDIS_URL!);
// Email service listens for user events
subscriber.subscribe('user.registered', async (event) => {
await sendWelcomeEmail(event.payload.email, event.payload.name);
});
// Analytics service also listens
subscriber.subscribe('user.registered', async (event) => {
await trackUserRegistration(event.payload.userId);
});
BullMQ (Redis-Based Job Queue)
// BULLMQ - PERSISTENT JOB QUEUE:
// ═══════════════════════════════════════════════════════════════
import { Queue, Worker, Job } from 'bullmq';
import Redis from 'ioredis';
// ─────────────────────────────────────────────────────────────────
// QUEUE SETUP
// ─────────────────────────────────────────────────────────────────
const connection = new Redis(process.env.REDIS_URL!, {
maxRetriesPerRequest: null, // Required for BullMQ
});
// Create queues for different event types
const emailQueue = new Queue('email-jobs', { connection });
const analyticsQueue = new Queue('analytics-jobs', { connection });
const notificationQueue = new Queue('notification-jobs', { connection });
// ─────────────────────────────────────────────────────────────────
// JOB TYPES
// ─────────────────────────────────────────────────────────────────
interface WelcomeEmailJob {
type: 'welcome-email';
userId: string;
email: string;
name: string;
}
interface OrderConfirmationJob {
type: 'order-confirmation';
orderId: string;
email: string;
}
interface PasswordResetJob {
type: 'password-reset';
email: string;
resetToken: string;
}
type EmailJob = WelcomeEmailJob | OrderConfirmationJob | PasswordResetJob;
// ─────────────────────────────────────────────────────────────────
// PRODUCER (Adding jobs to queue)
// ─────────────────────────────────────────────────────────────────
class EmailJobProducer {
constructor(private queue: Queue) {}
async sendWelcomeEmail(user: { id: string; email: string; name: string }) {
await this.queue.add(
'welcome-email',
{
type: 'welcome-email',
userId: user.id,
email: user.email,
name: user.name,
} as WelcomeEmailJob,
{
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
removeOnComplete: 100, // Keep last 100 completed jobs
removeOnFail: 1000, // Keep last 1000 failed jobs
}
);
}
async sendOrderConfirmation(order: { id: string; userEmail: string }) {
await this.queue.add(
'order-confirmation',
{
type: 'order-confirmation',
orderId: order.id,
email: order.userEmail,
} as OrderConfirmationJob,
{
priority: 1, // High priority
attempts: 5,
backoff: { type: 'exponential', delay: 2000 },
}
);
}
// Delayed job
async schedulePasswordReset(email: string, resetToken: string) {
await this.queue.add(
'password-reset',
{
type: 'password-reset',
email,
resetToken,
} as PasswordResetJob,
{
delay: 0, // Immediate, but could be delayed
attempts: 3,
}
);
}
}
export const emailProducer = new EmailJobProducer(emailQueue);
// ─────────────────────────────────────────────────────────────────
// WORKER (Processing jobs)
// ─────────────────────────────────────────────────────────────────
const emailWorker = new Worker<EmailJob>(
'email-jobs',
async (job: Job<EmailJob>) => {
console.log(`Processing job ${job.id}: ${job.data.type}`);
switch (job.data.type) {
case 'welcome-email':
await sendWelcomeEmail(job.data);
break;
case 'order-confirmation':
await sendOrderConfirmation(job.data);
break;
case 'password-reset':
await sendPasswordResetEmail(job.data);
break;
default:
throw new Error(`Unknown job type: ${(job.data as any).type}`);
}
return { success: true, processedAt: new Date() };
},
{
connection,
concurrency: 10, // Process 10 jobs in parallel
limiter: {
max: 100, // Max 100 jobs
duration: 1000, // Per second (rate limiting)
},
}
);
// Event handlers for worker
emailWorker.on('completed', (job, result) => {
console.log(`Job ${job.id} completed:`, result);
});
emailWorker.on('failed', (job, err) => {
console.error(`Job ${job?.id} failed:`, err);
// In production: alert, log to error tracking
});
emailWorker.on('error', (err) => {
console.error('Worker error:', err);
});
// ─────────────────────────────────────────────────────────────────
// EMAIL SENDING FUNCTIONS
// ─────────────────────────────────────────────────────────────────
async function sendWelcomeEmail(data: WelcomeEmailJob) {
// Actual email sending logic
console.log(`Sending welcome email to ${data.email}`);
await emailClient.send({
to: data.email,
template: 'welcome',
data: { name: data.name },
});
}
async function sendOrderConfirmation(data: OrderConfirmationJob) {
const order = await db.orders.findById(data.orderId);
await emailClient.send({
to: data.email,
template: 'order-confirmation',
data: { order },
});
}
async function sendPasswordResetEmail(data: PasswordResetJob) {
await emailClient.send({
to: data.email,
template: 'password-reset',
data: {
resetLink: `https://example.com/reset?token=${data.resetToken}`,
},
});
}
// ─────────────────────────────────────────────────────────────────
// USAGE IN APPLICATION
// ─────────────────────────────────────────────────────────────────
// In user registration flow
async function registerUser(input: RegisterUserInput) {
const user = await db.users.create(input);
// Add email job to queue - returns immediately
await emailProducer.sendWelcomeEmail({
id: user.id,
email: user.email,
name: user.name,
});
return user; // User gets response fast, email sends async
}
// Graceful shutdown
process.on('SIGTERM', async () => {
console.log('Shutting down...');
await emailWorker.close();
await emailQueue.close();
process.exit(0);
});
RabbitMQ
// RABBITMQ EVENT-DRIVEN PATTERN:
// ═══════════════════════════════════════════════════════════════
import amqp, { Channel, Connection, ConsumeMessage } from 'amqplib';
// ─────────────────────────────────────────────────────────────────
// CONNECTION MANAGEMENT
// ─────────────────────────────────────────────────────────────────
class RabbitMQConnection {
private connection: Connection | null = null;
private channel: Channel | null = null;
async connect(): Promise<void> {
this.connection = await amqp.connect(process.env.RABBITMQ_URL!);
this.channel = await this.connection.createChannel();
// Handle connection errors
this.connection.on('error', (err) => {
console.error('RabbitMQ connection error:', err);
});
this.connection.on('close', () => {
console.log('RabbitMQ connection closed');
});
}
getChannel(): Channel {
if (!this.channel) {
throw new Error('RabbitMQ not connected');
}
return this.channel;
}
async close(): Promise<void> {
await this.channel?.close();
await this.connection?.close();
}
}
export const rabbitmq = new RabbitMQConnection();
// ─────────────────────────────────────────────────────────────────
// EXCHANGE AND QUEUE SETUP
// ─────────────────────────────────────────────────────────────────
async function setupExchangesAndQueues(channel: Channel) {
// Topic exchange for events
await channel.assertExchange('domain-events', 'topic', {
durable: true,
});
// Queue for email service
await channel.assertQueue('email-service-queue', {
durable: true,
deadLetterExchange: 'dead-letters',
});
// Bind queue to events it cares about
await channel.bindQueue('email-service-queue', 'domain-events', 'user.*');
await channel.bindQueue('email-service-queue', 'domain-events', 'order.*');
// Queue for analytics service
await channel.assertQueue('analytics-service-queue', { durable: true });
await channel.bindQueue('analytics-service-queue', 'domain-events', '#'); // All events
// Dead letter exchange for failed messages
await channel.assertExchange('dead-letters', 'fanout', { durable: true });
await channel.assertQueue('dead-letter-queue', { durable: true });
await channel.bindQueue('dead-letter-queue', 'dead-letters', '');
}
// ─────────────────────────────────────────────────────────────────
// PUBLISHER
// ─────────────────────────────────────────────────────────────────
class RabbitMQPublisher {
constructor(private channel: Channel) {}
async publish<T extends DomainEvent>(event: T): Promise<void> {
const routingKey = event.type; // e.g., 'user.registered'
this.channel.publish(
'domain-events',
routingKey,
Buffer.from(JSON.stringify(event)),
{
persistent: true, // Survive broker restart
contentType: 'application/json',
messageId: event.id,
timestamp: event.timestamp.getTime(),
headers: {
correlationId: event.metadata.correlationId,
},
}
);
console.log(`Published ${routingKey}:`, event.id);
}
}
// ─────────────────────────────────────────────────────────────────
// CONSUMER
// ─────────────────────────────────────────────────────────────────
type MessageHandler = (event: DomainEvent) => Promise<void>;
class RabbitMQConsumer {
private handlers = new Map<string, MessageHandler>();
constructor(
private channel: Channel,
private queueName: string
) {}
registerHandler(eventType: string, handler: MessageHandler): void {
this.handlers.set(eventType, handler);
}
async start(): Promise<void> {
// Prefetch: only get 10 messages at a time
await this.channel.prefetch(10);
await this.channel.consume(
this.queueName,
async (msg: ConsumeMessage | null) => {
if (!msg) return;
try {
const event: DomainEvent = JSON.parse(msg.content.toString());
console.log(`Received ${event.type}:`, event.id);
const handler = this.handlers.get(event.type);
if (handler) {
await handler(event);
this.channel.ack(msg); // Acknowledge successful processing
} else {
console.warn(`No handler for ${event.type}`);
this.channel.ack(msg); // Acknowledge to remove from queue
}
} catch (error) {
console.error('Error processing message:', error);
// Retry logic: check if we should retry or dead-letter
const retryCount = (msg.properties.headers?.['x-retry-count'] || 0) as number;
if (retryCount < 3) {
// Retry: republish with incremented retry count
this.channel.publish(
'',
this.queueName,
msg.content,
{
...msg.properties,
headers: {
...msg.properties.headers,
'x-retry-count': retryCount + 1,
},
}
);
this.channel.ack(msg);
} else {
// Give up: send to dead letter queue
this.channel.nack(msg, false, false);
}
}
}
);
console.log(`Consumer started for ${this.queueName}`);
}
}
// ─────────────────────────────────────────────────────────────────
// USAGE
// ─────────────────────────────────────────────────────────────────
// Setup (run once on startup)
async function startEmailService() {
await rabbitmq.connect();
const channel = rabbitmq.getChannel();
await setupExchangesAndQueues(channel);
const consumer = new RabbitMQConsumer(channel, 'email-service-queue');
consumer.registerHandler('user.registered', async (event) => {
const { email, name } = event.payload as any;
await sendWelcomeEmail(email, name);
});
consumer.registerHandler('order.placed', async (event) => {
const { orderId, userEmail } = event.payload as any;
await sendOrderConfirmationEmail(orderId, userEmail);
});
await consumer.start();
}
// In application code (publisher)
async function registerUser(input: RegisterUserInput) {
const user = await db.users.create(input);
const publisher = new RabbitMQPublisher(rabbitmq.getChannel());
await publisher.publish({
id: randomUUID(),
type: 'user.registered',
timestamp: new Date(),
payload: {
userId: user.id,
email: user.email,
name: user.name,
},
metadata: {
correlationId: randomUUID(),
},
});
return user;
}
Pattern 3: Event Sourcing
Core Concepts
EVENT SOURCING FUNDAMENTALS:
════════════════════════════════════════════════════════════════════
TRADITIONAL (State-Based):
──────────────────────────
Store current state:
┌─────────────────────────────────────────────────────────────────┐
│ users table │
│ ┌────────┬───────────────┬─────────┬──────────────────────┐ │
│ │ id │ email │ balance │ updated_at │ │
│ ├────────┼───────────────┼─────────┼──────────────────────┤ │
│ │ 123 │ user@test.com │ 150.00 │ 2024-01-15 10:30:00 │ │
│ └────────┴───────────────┴─────────┴──────────────────────┘ │
│ │
│ How did balance become $150? Who knows! │
│ History is lost. │
│ │
└─────────────────────────────────────────────────────────────────┘
EVENT SOURCING:
───────────────
Store events (facts that happened):
┌─────────────────────────────────────────────────────────────────┐
│ account_events table │
│ ┌────────┬──────────────────┬────────────┬──────────────────┐ │
│ │ seq │ type │ data │ timestamp │ │
│ ├────────┼──────────────────┼────────────┼──────────────────┤ │
│ │ 1 │ AccountOpened │ {userId} │ 2024-01-10 09:00│ │
│ │ 2 │ MoneyDeposited │ {amt: 100} │ 2024-01-11 14:00│ │
│ │ 3 │ MoneyWithdrawn │ {amt: 25} │ 2024-01-12 10:00│ │
│ │ 4 │ MoneyDeposited │ {amt: 75} │ 2024-01-15 10:30│ │
│ └────────┴──────────────────┴────────────┴──────────────────┘ │
│ │
│ Current balance: 100 - 25 + 75 = $150 │
│ Full history preserved. Auditable. Replayable. │
│ │
└─────────────────────────────────────────────────────────────────┘
KEY PRINCIPLES:
───────────────
1. Events are immutable facts (never updated or deleted)
2. Current state is derived by replaying events
3. Events are the source of truth
4. State can be rebuilt at any point in time
Event Sourcing Implementation
// EVENT SOURCING IN NODE.JS:
// ═══════════════════════════════════════════════════════════════
// ─────────────────────────────────────────────────────────────────
// EVENT DEFINITIONS
// ─────────────────────────────────────────────────────────────────
interface Event<T = unknown> {
id: string;
aggregateId: string;
type: string;
data: T;
timestamp: Date;
version: number;
metadata: Record<string, unknown>;
}
// Account events
interface AccountOpenedEvent extends Event<{
ownerId: string;
initialBalance: number;
}> {
type: 'AccountOpened';
}
interface MoneyDepositedEvent extends Event<{
amount: number;
description: string;
}> {
type: 'MoneyDeposited';
}
interface MoneyWithdrawnEvent extends Event<{
amount: number;
description: string;
}> {
type: 'MoneyWithdrawn';
}
interface AccountClosedEvent extends Event<{
reason: string;
}> {
type: 'AccountClosed';
}
type AccountEvent =
| AccountOpenedEvent
| MoneyDepositedEvent
| MoneyWithdrawnEvent
| AccountClosedEvent;
// ─────────────────────────────────────────────────────────────────
// EVENT STORE
// ─────────────────────────────────────────────────────────────────
interface EventStore {
append(aggregateId: string, events: Event[], expectedVersion: number): Promise<void>;
getEvents(aggregateId: string, fromVersion?: number): Promise<Event[]>;
getAllEvents(fromPosition?: number): Promise<Event[]>;
}
// PostgreSQL implementation
class PostgresEventStore implements EventStore {
constructor(private db: Pool) {}
async append(
aggregateId: string,
events: Event[],
expectedVersion: number
): Promise<void> {
const client = await this.db.connect();
try {
await client.query('BEGIN');
// Optimistic concurrency check
const result = await client.query(
'SELECT MAX(version) as current_version FROM events WHERE aggregate_id = $1',
[aggregateId]
);
const currentVersion = result.rows[0]?.current_version ?? -1;
if (currentVersion !== expectedVersion) {
throw new ConcurrencyError(
`Expected version ${expectedVersion}, but found ${currentVersion}`
);
}
// Insert events
for (const event of events) {
await client.query(
`INSERT INTO events (id, aggregate_id, type, data, timestamp, version, metadata)
VALUES ($1, $2, $3, $4, $5, $6, $7)`,
[
event.id,
event.aggregateId,
event.type,
JSON.stringify(event.data),
event.timestamp,
event.version,
JSON.stringify(event.metadata),
]
);
}
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
async getEvents(aggregateId: string, fromVersion = 0): Promise<Event[]> {
const result = await this.db.query(
`SELECT * FROM events
WHERE aggregate_id = $1 AND version >= $2
ORDER BY version ASC`,
[aggregateId, fromVersion]
);
return result.rows.map(this.rowToEvent);
}
async getAllEvents(fromPosition = 0): Promise<Event[]> {
const result = await this.db.query(
`SELECT * FROM events
WHERE id > $1
ORDER BY id ASC`,
[fromPosition]
);
return result.rows.map(this.rowToEvent);
}
private rowToEvent(row: any): Event {
return {
id: row.id,
aggregateId: row.aggregate_id,
type: row.type,
data: row.data,
timestamp: row.timestamp,
version: row.version,
metadata: row.metadata,
};
}
}
// ─────────────────────────────────────────────────────────────────
// AGGREGATE
// ─────────────────────────────────────────────────────────────────
// State derived from events
interface AccountState {
id: string;
ownerId: string;
balance: number;
isOpen: boolean;
version: number;
}
class Account {
private state: AccountState;
private uncommittedEvents: AccountEvent[] = [];
constructor(state: AccountState) {
this.state = state;
}
// Factory method to create new account
static open(id: string, ownerId: string, initialBalance: number): Account {
const account = new Account({
id,
ownerId: '',
balance: 0,
isOpen: false,
version: -1,
});
account.applyNewEvent({
id: randomUUID(),
aggregateId: id,
type: 'AccountOpened',
data: { ownerId, initialBalance },
timestamp: new Date(),
version: 0,
metadata: {},
});
return account;
}
// Factory method to load from events
static fromEvents(events: AccountEvent[]): Account {
const account = new Account({
id: '',
ownerId: '',
balance: 0,
isOpen: false,
version: -1,
});
for (const event of events) {
account.applyEvent(event);
}
return account;
}
// Commands
deposit(amount: number, description: string): void {
if (!this.state.isOpen) {
throw new Error('Account is closed');
}
if (amount <= 0) {
throw new Error('Deposit amount must be positive');
}
this.applyNewEvent({
id: randomUUID(),
aggregateId: this.state.id,
type: 'MoneyDeposited',
data: { amount, description },
timestamp: new Date(),
version: this.state.version + 1,
metadata: {},
});
}
withdraw(amount: number, description: string): void {
if (!this.state.isOpen) {
throw new Error('Account is closed');
}
if (amount <= 0) {
throw new Error('Withdrawal amount must be positive');
}
if (amount > this.state.balance) {
throw new Error('Insufficient funds');
}
this.applyNewEvent({
id: randomUUID(),
aggregateId: this.state.id,
type: 'MoneyWithdrawn',
data: { amount, description },
timestamp: new Date(),
version: this.state.version + 1,
metadata: {},
});
}
close(reason: string): void {
if (!this.state.isOpen) {
throw new Error('Account is already closed');
}
if (this.state.balance !== 0) {
throw new Error('Cannot close account with non-zero balance');
}
this.applyNewEvent({
id: randomUUID(),
aggregateId: this.state.id,
type: 'AccountClosed',
data: { reason },
timestamp: new Date(),
version: this.state.version + 1,
metadata: {},
});
}
// Apply event to state (event handler)
private applyEvent(event: AccountEvent): void {
switch (event.type) {
case 'AccountOpened':
this.state = {
id: event.aggregateId,
ownerId: event.data.ownerId,
balance: event.data.initialBalance,
isOpen: true,
version: event.version,
};
break;
case 'MoneyDeposited':
this.state.balance += event.data.amount;
this.state.version = event.version;
break;
case 'MoneyWithdrawn':
this.state.balance -= event.data.amount;
this.state.version = event.version;
break;
case 'AccountClosed':
this.state.isOpen = false;
this.state.version = event.version;
break;
}
}
private applyNewEvent(event: AccountEvent): void {
this.applyEvent(event);
this.uncommittedEvents.push(event);
}
// Getters
get id(): string { return this.state.id; }
get balance(): number { return this.state.balance; }
get isOpen(): boolean { return this.state.isOpen; }
get version(): number { return this.state.version; }
get uncommitted(): AccountEvent[] { return [...this.uncommittedEvents]; }
clearUncommitted(): void {
this.uncommittedEvents = [];
}
}
// ─────────────────────────────────────────────────────────────────
// REPOSITORY
// ─────────────────────────────────────────────────────────────────
class AccountRepository {
constructor(private eventStore: EventStore) {}
async getById(id: string): Promise<Account | null> {
const events = await this.eventStore.getEvents(id);
if (events.length === 0) {
return null;
}
return Account.fromEvents(events as AccountEvent[]);
}
async save(account: Account): Promise<void> {
const uncommitted = account.uncommitted;
if (uncommitted.length === 0) {
return;
}
const expectedVersion = account.version - uncommitted.length;
await this.eventStore.append(account.id, uncommitted, expectedVersion);
account.clearUncommitted();
}
}
// ─────────────────────────────────────────────────────────────────
// USAGE
// ─────────────────────────────────────────────────────────────────
const eventStore = new PostgresEventStore(dbPool);
const accountRepo = new AccountRepository(eventStore);
// Open account
async function openAccount(ownerId: string, initialDeposit: number) {
const account = Account.open(randomUUID(), ownerId, initialDeposit);
await accountRepo.save(account);
return account.id;
}
// Deposit
async function deposit(accountId: string, amount: number, description: string) {
const account = await accountRepo.getById(accountId);
if (!account) throw new Error('Account not found');
account.deposit(amount, description);
await accountRepo.save(account);
return account.balance;
}
// Withdraw
async function withdraw(accountId: string, amount: number, description: string) {
const account = await accountRepo.getById(accountId);
if (!account) throw new Error('Account not found');
account.withdraw(amount, description);
await accountRepo.save(account);
return account.balance;
}
// Get balance (read model could also be used)
async function getBalance(accountId: string) {
const account = await accountRepo.getById(accountId);
if (!account) throw new Error('Account not found');
return account.balance;
}
Pattern 4: CQRS (Command Query Responsibility Segregation)
CQRS with Event Sourcing
// CQRS PATTERN:
// ═══════════════════════════════════════════════════════════════
/*
CQRS separates reads and writes into different models:
┌─────────────────────────────────────────────────────────────────┐
│ │
│ ┌─────────────┐ │
│ │ Client │ │
│ └──────┬──────┘ │
│ │ │
│ ┌────────────────┴────────────────┐ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Commands │ │ Queries │ │
│ │ (Write Model) │ │ (Read Model) │ │
│ └────────┬────────┘ └────────┬────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Event Store │───events────►│ Read Database │ │
│ │ (Source of │ │ (Optimized for │ │
│ │ Truth) │ │ queries) │ │
│ └─────────────────┘ └─────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Benefits:
• Write model optimized for business logic
• Read model optimized for queries (denormalized, indexed)
• Can scale reads and writes independently
• Reads are simple and fast
Tradeoffs:
• Eventual consistency between write and read models
• More complexity
• Need to handle sync between models
*/
// ─────────────────────────────────────────────────────────────────
// COMMAND SIDE (Write Model)
// ─────────────────────────────────────────────────────────────────
// Commands
interface Command {
type: string;
timestamp: Date;
metadata: { userId: string; correlationId: string };
}
interface OpenAccountCommand extends Command {
type: 'OpenAccount';
data: {
accountId: string;
ownerId: string;
initialBalance: number;
};
}
interface DepositMoneyCommand extends Command {
type: 'DepositMoney';
data: {
accountId: string;
amount: number;
description: string;
};
}
interface WithdrawMoneyCommand extends Command {
type: 'WithdrawMoney';
data: {
accountId: string;
amount: number;
description: string;
};
}
interface TransferMoneyCommand extends Command {
type: 'TransferMoney';
data: {
fromAccountId: string;
toAccountId: string;
amount: number;
description: string;
};
}
// Command Handler
class AccountCommandHandler {
constructor(
private accountRepo: AccountRepository,
private eventPublisher: EventPublisher
) {}
async handle(command: Command): Promise<void> {
switch (command.type) {
case 'OpenAccount':
await this.handleOpenAccount(command as OpenAccountCommand);
break;
case 'DepositMoney':
await this.handleDeposit(command as DepositMoneyCommand);
break;
case 'WithdrawMoney':
await this.handleWithdraw(command as WithdrawMoneyCommand);
break;
case 'TransferMoney':
await this.handleTransfer(command as TransferMoneyCommand);
break;
default:
throw new Error(`Unknown command: ${command.type}`);
}
}
private async handleOpenAccount(cmd: OpenAccountCommand): Promise<void> {
const account = Account.open(
cmd.data.accountId,
cmd.data.ownerId,
cmd.data.initialBalance
);
await this.accountRepo.save(account);
// Publish events for read model update
for (const event of account.uncommitted) {
await this.eventPublisher.publish(event);
}
}
private async handleDeposit(cmd: DepositMoneyCommand): Promise<void> {
const account = await this.accountRepo.getById(cmd.data.accountId);
if (!account) throw new Error('Account not found');
account.deposit(cmd.data.amount, cmd.data.description);
await this.accountRepo.save(account);
for (const event of account.uncommitted) {
await this.eventPublisher.publish(event);
}
}
private async handleWithdraw(cmd: WithdrawMoneyCommand): Promise<void> {
const account = await this.accountRepo.getById(cmd.data.accountId);
if (!account) throw new Error('Account not found');
account.withdraw(cmd.data.amount, cmd.data.description);
await this.accountRepo.save(account);
for (const event of account.uncommitted) {
await this.eventPublisher.publish(event);
}
}
private async handleTransfer(cmd: TransferMoneyCommand): Promise<void> {
// Load both accounts
const fromAccount = await this.accountRepo.getById(cmd.data.fromAccountId);
const toAccount = await this.accountRepo.getById(cmd.data.toAccountId);
if (!fromAccount || !toAccount) {
throw new Error('Account not found');
}
// Execute transfer
fromAccount.withdraw(cmd.data.amount, `Transfer to ${cmd.data.toAccountId}: ${cmd.data.description}`);
toAccount.deposit(cmd.data.amount, `Transfer from ${cmd.data.fromAccountId}: ${cmd.data.description}`);
// Save both
await this.accountRepo.save(fromAccount);
await this.accountRepo.save(toAccount);
// Publish all events
for (const event of [...fromAccount.uncommitted, ...toAccount.uncommitted]) {
await this.eventPublisher.publish(event);
}
}
}
// ─────────────────────────────────────────────────────────────────
// QUERY SIDE (Read Model)
// ─────────────────────────────────────────────────────────────────
// Read model (denormalized for fast queries)
interface AccountReadModel {
id: string;
ownerId: string;
ownerName: string; // Denormalized
balance: number;
isOpen: boolean;
transactionCount: number;
lastTransactionAt: Date | null;
createdAt: Date;
updatedAt: Date;
}
interface TransactionReadModel {
id: string;
accountId: string;
type: 'deposit' | 'withdrawal' | 'transfer_in' | 'transfer_out';
amount: number;
description: string;
balanceAfter: number;
timestamp: Date;
}
// Projector: Updates read model when events occur
class AccountProjector {
constructor(private readDb: ReadDatabase) {}
async handleEvent(event: Event): Promise<void> {
switch (event.type) {
case 'AccountOpened':
await this.onAccountOpened(event as AccountOpenedEvent);
break;
case 'MoneyDeposited':
await this.onMoneyDeposited(event as MoneyDepositedEvent);
break;
case 'MoneyWithdrawn':
await this.onMoneyWithdrawn(event as MoneyWithdrawnEvent);
break;
case 'AccountClosed':
await this.onAccountClosed(event as AccountClosedEvent);
break;
}
}
private async onAccountOpened(event: AccountOpenedEvent): Promise<void> {
// Get owner name from user service (denormalize)
const owner = await userService.getById(event.data.ownerId);
await this.readDb.accounts.create({
id: event.aggregateId,
ownerId: event.data.ownerId,
ownerName: owner?.name || 'Unknown',
balance: event.data.initialBalance,
isOpen: true,
transactionCount: 0,
lastTransactionAt: null,
createdAt: event.timestamp,
updatedAt: event.timestamp,
});
// Also create initial transaction record if there was an initial deposit
if (event.data.initialBalance > 0) {
await this.readDb.transactions.create({
id: event.id,
accountId: event.aggregateId,
type: 'deposit',
amount: event.data.initialBalance,
description: 'Initial deposit',
balanceAfter: event.data.initialBalance,
timestamp: event.timestamp,
});
}
}
private async onMoneyDeposited(event: MoneyDepositedEvent): Promise<void> {
const account = await this.readDb.accounts.findById(event.aggregateId);
if (!account) return;
const newBalance = account.balance + event.data.amount;
await this.readDb.accounts.update(event.aggregateId, {
balance: newBalance,
transactionCount: account.transactionCount + 1,
lastTransactionAt: event.timestamp,
updatedAt: event.timestamp,
});
await this.readDb.transactions.create({
id: event.id,
accountId: event.aggregateId,
type: 'deposit',
amount: event.data.amount,
description: event.data.description,
balanceAfter: newBalance,
timestamp: event.timestamp,
});
}
private async onMoneyWithdrawn(event: MoneyWithdrawnEvent): Promise<void> {
const account = await this.readDb.accounts.findById(event.aggregateId);
if (!account) return;
const newBalance = account.balance - event.data.amount;
await this.readDb.accounts.update(event.aggregateId, {
balance: newBalance,
transactionCount: account.transactionCount + 1,
lastTransactionAt: event.timestamp,
updatedAt: event.timestamp,
});
await this.readDb.transactions.create({
id: event.id,
accountId: event.aggregateId,
type: 'withdrawal',
amount: event.data.amount,
description: event.data.description,
balanceAfter: newBalance,
timestamp: event.timestamp,
});
}
private async onAccountClosed(event: AccountClosedEvent): Promise<void> {
await this.readDb.accounts.update(event.aggregateId, {
isOpen: false,
updatedAt: event.timestamp,
});
}
// Rebuild entire read model from events (for recovery/migration)
async rebuild(): Promise<void> {
console.log('Rebuilding read model...');
// Clear existing read model
await this.readDb.accounts.deleteAll();
await this.readDb.transactions.deleteAll();
// Replay all events
const events = await eventStore.getAllEvents();
for (const event of events) {
await this.handleEvent(event);
}
console.log(`Rebuilt from ${events.length} events`);
}
}
// Query handlers (simple reads from read model)
class AccountQueryHandler {
constructor(private readDb: ReadDatabase) {}
async getAccount(id: string): Promise<AccountReadModel | null> {
return this.readDb.accounts.findById(id);
}
async getAccountsByOwner(ownerId: string): Promise<AccountReadModel[]> {
return this.readDb.accounts.findByOwnerId(ownerId);
}
async getTransactions(
accountId: string,
options: { limit: number; offset: number }
): Promise<TransactionReadModel[]> {
return this.readDb.transactions.findByAccountId(accountId, options);
}
async getAccountSummary(ownerId: string): Promise<{
totalBalance: number;
accountCount: number;
totalTransactions: number;
}> {
// Optimized query on denormalized read model
const accounts = await this.readDb.accounts.findByOwnerId(ownerId);
return {
totalBalance: accounts.reduce((sum, a) => sum + a.balance, 0),
accountCount: accounts.length,
totalTransactions: accounts.reduce((sum, a) => sum + a.transactionCount, 0),
};
}
}
Error Handling and Resilience
Retry Patterns
// RETRY AND ERROR HANDLING PATTERNS:
// ═══════════════════════════════════════════════════════════════
// ─────────────────────────────────────────────────────────────────
// EXPONENTIAL BACKOFF RETRY
// ─────────────────────────────────────────────────────────────────
interface RetryConfig {
maxAttempts: number;
baseDelay: number;
maxDelay: number;
retryableErrors?: string[];
}
async function withRetry<T>(
fn: () => Promise<T>,
config: RetryConfig
): Promise<T> {
let lastError: Error | undefined;
for (let attempt = 1; attempt <= config.maxAttempts; attempt++) {
try {
return await fn();
} catch (error) {
lastError = error as Error;
// Check if error is retryable
if (config.retryableErrors && !isRetryable(error, config.retryableErrors)) {
throw error;
}
if (attempt === config.maxAttempts) {
break;
}
// Calculate delay with exponential backoff + jitter
const delay = Math.min(
config.baseDelay * Math.pow(2, attempt - 1) + Math.random() * 100,
config.maxDelay
);
console.log(`Attempt ${attempt} failed, retrying in ${delay}ms...`);
await sleep(delay);
}
}
throw lastError;
}
function isRetryable(error: unknown, retryableErrors: string[]): boolean {
if (error instanceof Error) {
return retryableErrors.some(
(e) => error.message.includes(e) || error.name.includes(e)
);
}
return false;
}
// Usage
const result = await withRetry(
() => publishToQueue(event),
{
maxAttempts: 5,
baseDelay: 1000,
maxDelay: 30000,
retryableErrors: ['ECONNREFUSED', 'ETIMEDOUT', 'Connection lost'],
}
);
// ─────────────────────────────────────────────────────────────────
// CIRCUIT BREAKER
// ─────────────────────────────────────────────────────────────────
enum CircuitState {
CLOSED = 'CLOSED', // Normal operation
OPEN = 'OPEN', // Failing, reject requests
HALF_OPEN = 'HALF_OPEN', // Testing if recovered
}
class CircuitBreaker {
private state = CircuitState.CLOSED;
private failures = 0;
private lastFailure?: Date;
private successCount = 0;
constructor(
private options: {
failureThreshold: number;
resetTimeout: number;
halfOpenRequests: number;
}
) {}
async execute<T>(fn: () => Promise<T>): Promise<T> {
if (this.state === CircuitState.OPEN) {
if (this.shouldTryReset()) {
this.state = CircuitState.HALF_OPEN;
this.successCount = 0;
} else {
throw new CircuitBreakerOpenError('Circuit breaker is open');
}
}
try {
const result = await fn();
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}
private onSuccess(): void {
if (this.state === CircuitState.HALF_OPEN) {
this.successCount++;
if (this.successCount >= this.options.halfOpenRequests) {
this.state = CircuitState.CLOSED;
this.failures = 0;
console.log('Circuit breaker closed');
}
} else {
this.failures = 0;
}
}
private onFailure(): void {
this.failures++;
this.lastFailure = new Date();
if (this.state === CircuitState.HALF_OPEN) {
this.state = CircuitState.OPEN;
console.log('Circuit breaker opened (half-open test failed)');
} else if (this.failures >= this.options.failureThreshold) {
this.state = CircuitState.OPEN;
console.log('Circuit breaker opened');
}
}
private shouldTryReset(): boolean {
if (!this.lastFailure) return true;
const timeSinceFailure = Date.now() - this.lastFailure.getTime();
return timeSinceFailure >= this.options.resetTimeout;
}
}
// Usage
const circuitBreaker = new CircuitBreaker({
failureThreshold: 5,
resetTimeout: 30000,
halfOpenRequests: 3,
});
async function publishWithCircuitBreaker(event: Event) {
return circuitBreaker.execute(() => messageQueue.publish(event));
}
// ─────────────────────────────────────────────────────────────────
// DEAD LETTER QUEUE
// ─────────────────────────────────────────────────────────────────
interface DeadLetter {
originalEvent: Event;
error: string;
stack?: string;
attempts: number;
firstFailedAt: Date;
lastFailedAt: Date;
queueName: string;
}
class DeadLetterHandler {
constructor(
private dlqStore: DeadLetterStore,
private alertService: AlertService
) {}
async handleFailedEvent(
event: Event,
error: Error,
queueName: string,
attempts: number
): Promise<void> {
const deadLetter: DeadLetter = {
originalEvent: event,
error: error.message,
stack: error.stack,
attempts,
firstFailedAt: new Date(),
lastFailedAt: new Date(),
queueName,
};
await this.dlqStore.save(deadLetter);
// Alert if DLQ is growing
const dlqCount = await this.dlqStore.count();
if (dlqCount > 100) {
await this.alertService.alert({
severity: 'warning',
message: `Dead letter queue has ${dlqCount} items`,
});
}
console.error('Event moved to DLQ:', {
eventId: event.id,
eventType: event.type,
error: error.message,
});
}
// Manual retry from DLQ
async retryDeadLetter(deadLetterId: string): Promise<void> {
const deadLetter = await this.dlqStore.getById(deadLetterId);
if (!deadLetter) throw new Error('Dead letter not found');
// Republish to original queue
await messageQueue.publish(deadLetter.originalEvent, deadLetter.queueName);
// Remove from DLQ
await this.dlqStore.delete(deadLetterId);
}
// Batch retry all
async retryAll(queueName?: string): Promise<{ success: number; failed: number }> {
const deadLetters = await this.dlqStore.getAll(queueName);
let success = 0;
let failed = 0;
for (const dl of deadLetters) {
try {
await this.retryDeadLetter(dl.id);
success++;
} catch {
failed++;
}
}
return { success, failed };
}
}
Idempotency
// IDEMPOTENCY PATTERNS:
// ═══════════════════════════════════════════════════════════════
// ─────────────────────────────────────────────────────────────────
// IDEMPOTENCY KEY PATTERN
// ─────────────────────────────────────────────────────────────────
interface ProcessedEvent {
eventId: string;
processedAt: Date;
result?: any;
}
class IdempotentEventProcessor {
constructor(
private processedStore: ProcessedEventStore,
private lockManager: LockManager
) {}
async process<T>(
event: Event,
handler: (event: Event) => Promise<T>
): Promise<T | null> {
// Check if already processed
const existing = await this.processedStore.get(event.id);
if (existing) {
console.log(`Event ${event.id} already processed, skipping`);
return existing.result;
}
// Acquire lock to prevent concurrent processing
const lock = await this.lockManager.acquire(`event:${event.id}`, 30000);
try {
// Double-check after acquiring lock
const existingAfterLock = await this.processedStore.get(event.id);
if (existingAfterLock) {
return existingAfterLock.result;
}
// Process the event
const result = await handler(event);
// Mark as processed
await this.processedStore.save({
eventId: event.id,
processedAt: new Date(),
result,
});
return result;
} finally {
await lock.release();
}
}
}
// Usage
const processor = new IdempotentEventProcessor(processedStore, lockManager);
consumer.subscribe('order.placed', async (event) => {
await processor.process(event, async (e) => {
// This will only run once per event ID
const order = e.payload as Order;
await inventory.reserve(order.items);
await sendOrderConfirmation(order);
return { success: true };
});
});
// ─────────────────────────────────────────────────────────────────
// DATABASE-LEVEL IDEMPOTENCY
// ─────────────────────────────────────────────────────────────────
// Use database constraints to ensure idempotency
async function handlePaymentReceived(event: PaymentReceivedEvent) {
try {
await db.transaction(async (tx) => {
// This INSERT will fail if event already processed (unique constraint)
await tx.processedEvents.create({
eventId: event.id,
processedAt: new Date(),
});
// Process payment
await tx.orders.update({
where: { id: event.payload.orderId },
data: { paymentStatus: 'paid' },
});
await tx.payments.create({
orderId: event.payload.orderId,
amount: event.payload.amount,
transactionId: event.payload.transactionId,
});
});
} catch (error) {
if (isUniqueConstraintViolation(error)) {
console.log(`Event ${event.id} already processed`);
return; // Idempotent - safe to ignore
}
throw error;
}
}
Observability
Event Tracing
// OBSERVABILITY FOR EVENT-DRIVEN SYSTEMS:
// ═══════════════════════════════════════════════════════════════
// ─────────────────────────────────────────────────────────────────
// CORRELATION AND TRACING
// ─────────────────────────────────────────────────────────────────
interface TracingMetadata {
traceId: string; // Unique ID for entire flow
spanId: string; // Unique ID for this event
parentSpanId?: string; // ID of event that caused this one
correlationId: string; // Business correlation (e.g., orderId)
}
class TracedEventBus {
constructor(
private eventBus: EventBus,
private tracer: Tracer
) {}
async publish<T extends DomainEvent>(
event: Omit<T, 'id' | 'timestamp'>,
parentContext?: TracingMetadata
): Promise<void> {
const traceId = parentContext?.traceId || randomUUID();
const spanId = randomUUID();
const tracedEvent: T = {
...event,
id: randomUUID(),
timestamp: new Date(),
metadata: {
...event.metadata,
tracing: {
traceId,
spanId,
parentSpanId: parentContext?.spanId,
correlationId: event.metadata.correlationId,
},
},
} as T;
// Create trace span
const span = this.tracer.startSpan(`event.publish.${event.type}`, {
traceId,
spanId,
parentSpanId: parentContext?.spanId,
});
try {
await this.eventBus.publish(tracedEvent);
span.setStatus('ok');
} catch (error) {
span.setStatus('error', (error as Error).message);
throw error;
} finally {
span.end();
}
}
subscribe<T extends DomainEvent>(
eventType: string,
handler: (event: T, context: TracingMetadata) => Promise<void>
): void {
this.eventBus.subscribe<T>(eventType, async (event) => {
const tracing = event.metadata.tracing as TracingMetadata;
const span = this.tracer.startSpan(`event.handle.${eventType}`, {
traceId: tracing.traceId,
spanId: randomUUID(),
parentSpanId: tracing.spanId,
});
try {
await handler(event, tracing);
span.setStatus('ok');
} catch (error) {
span.setStatus('error', (error as Error).message);
throw error;
} finally {
span.end();
}
});
}
}
// ─────────────────────────────────────────────────────────────────
// METRICS
// ─────────────────────────────────────────────────────────────────
import { Counter, Histogram, Registry } from 'prom-client';
const registry = new Registry();
// Event metrics
const eventsPublished = new Counter({
name: 'events_published_total',
help: 'Total number of events published',
labelNames: ['event_type'],
registers: [registry],
});
const eventsProcessed = new Counter({
name: 'events_processed_total',
help: 'Total number of events processed',
labelNames: ['event_type', 'status'],
registers: [registry],
});
const eventProcessingDuration = new Histogram({
name: 'event_processing_duration_seconds',
help: 'Event processing duration in seconds',
labelNames: ['event_type'],
buckets: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5],
registers: [registry],
});
const eventLag = new Histogram({
name: 'event_lag_seconds',
help: 'Time between event creation and processing',
labelNames: ['event_type'],
buckets: [0.1, 0.5, 1, 5, 10, 30, 60, 300],
registers: [registry],
});
// Instrumented event handler
function instrumentedHandler<T extends DomainEvent>(
eventType: string,
handler: (event: T) => Promise<void>
): (event: T) => Promise<void> {
return async (event: T) => {
const end = eventProcessingDuration.startTimer({ event_type: eventType });
// Track lag
const lag = (Date.now() - event.timestamp.getTime()) / 1000;
eventLag.observe({ event_type: eventType }, lag);
try {
await handler(event);
eventsProcessed.inc({ event_type: eventType, status: 'success' });
} catch (error) {
eventsProcessed.inc({ event_type: eventType, status: 'error' });
throw error;
} finally {
end();
}
};
}
// ─────────────────────────────────────────────────────────────────
// STRUCTURED LOGGING
// ─────────────────────────────────────────────────────────────────
import pino from 'pino';
const logger = pino({
level: 'info',
formatters: {
level: (label) => ({ level: label }),
},
});
function createEventLogger(event: DomainEvent) {
return logger.child({
eventId: event.id,
eventType: event.type,
traceId: (event.metadata.tracing as TracingMetadata)?.traceId,
correlationId: event.metadata.correlationId,
});
}
// Usage in handlers
eventBus.subscribe<OrderPlacedEvent>('order.placed', async (event) => {
const log = createEventLogger(event);
log.info('Processing order placed event');
try {
await processOrder(event.payload);
log.info({ orderId: event.payload.orderId }, 'Order processed successfully');
} catch (error) {
log.error({ error, orderId: event.payload.orderId }, 'Failed to process order');
throw error;
}
});
Quick Reference
┌─────────────────────────────────────────────────────────────────────┐
│ EVENT-DRIVEN NODE.JS QUICK REFERENCE │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ PATTERN SELECTION │
│ ───────────────────────────────────────────────────────────────── │
│ In-process (EventEmitter) → Single app, fire-and-forget │
│ Redis Pub/Sub → Multi-service, at-most-once delivery │
│ BullMQ/Job Queue → Reliable background jobs, retries │
│ RabbitMQ/Kafka → Durable, at-least-once delivery │
│ Event Sourcing → Full audit trail, replay capability │
│ CQRS → Separate read/write optimization │
│ │
│ EVENT STRUCTURE │
│ ───────────────────────────────────────────────────────────────── │
│ interface DomainEvent { │
│ id: string; // Unique event ID │
│ type: string; // 'entity.action' format │
│ timestamp: Date; │
│ payload: T; // Event data │
│ metadata: { │
│ correlationId: string; // Business correlation │
│ causationId?: string; // Event that caused this │
│ userId?: string; // Who triggered │
│ }; │
│ } │
│ │
│ NAMING CONVENTIONS │
│ ───────────────────────────────────────────────────────────────── │
│ Events: Past tense (facts that happened) │
│ ✓ user.registered, order.placed, payment.received │
│ ✗ register.user, place.order, receive.payment │
│ │
│ DELIVERY GUARANTEES │
│ ───────────────────────────────────────────────────────────────── │
│ At-most-once: Event may be lost (Redis Pub/Sub) │
│ At-least-once: Event may be duplicated (RabbitMQ + ack) │
│ Exactly-once: Idempotent handler + at-least-once │
│ │
│ IDEMPOTENCY PATTERN │
│ ───────────────────────────────────────────────────────────────── │
│ 1. Check if event.id already processed │
│ 2. Acquire lock on event.id │
│ 3. Process event │
│ 4. Mark event.id as processed (in same transaction if possible) │
│ 5. Release lock │
│ │
│ RETRY STRATEGY │
│ ───────────────────────────────────────────────────────────────── │
│ Exponential backoff: delay = base * 2^attempt + jitter │
│ Max attempts: 3-5 for most cases │
│ Dead letter queue: After max attempts │
│ │
│ OBSERVABILITY CHECKLIST │
│ ───────────────────────────────────────────────────────────────── │
│ □ Trace ID in all events (correlate across services) │
│ □ Metrics: published, processed, failed, lag │
│ □ Structured logging with event context │
│ □ Dead letter queue monitoring │
│ □ Consumer lag alerting │
│ │
│ COMMON MISTAKES │
│ ───────────────────────────────────────────────────────────────── │
│ ✗ Not handling duplicate events (no idempotency) │
│ ✗ Synchronous event handling blocking requests │
│ ✗ No retry logic for transient failures │
│ ✗ Missing correlation IDs (can't trace flows) │
│ ✗ Huge events (put data in DB, reference in event) │
│ ✗ No dead letter handling │
│ ✗ Mixing commands and events │
│ │
│ TOOLS │
│ ───────────────────────────────────────────────────────────────── │
│ Queue: BullMQ, RabbitMQ, Amazon SQS │
│ Streaming: Kafka, Redis Streams, Amazon Kinesis │
│ Event Store: EventStoreDB, PostgreSQL, DynamoDB │
│ Observability: OpenTelemetry, Prometheus, Jaeger │
│ │
└─────────────────────────────────────────────────────────────────────┘
Conclusion
Event-driven architecture in Node.js isn't a single pattern—it's a spectrum of approaches, each suited to different needs.
Start simple:
- Use EventEmitter for in-process decoupling
- Add BullMQ when you need reliable background jobs
- Introduce RabbitMQ or Kafka when crossing service boundaries
Key principles:
-
Events are facts. Name them in past tense. They describe what happened, not what should happen.
-
Design for idempotency. Events will be delivered multiple times. Your handlers must handle this gracefully.
-
Include correlation IDs. Without them, debugging distributed flows is nearly impossible.
-
Plan for failure. Retry transient errors, dead-letter permanent failures, and monitor both.
-
Keep events small. Store the data, reference it in the event. Don't put your entire database row in every event.
When to use what:
- Simple decoupling: EventEmitter
- Reliable async jobs: BullMQ
- Service-to-service: RabbitMQ or Kafka
- Audit trail needed: Event sourcing
- Complex read patterns: CQRS
Node.js is naturally suited for event-driven systems. The event loop, non-blocking I/O, and rich ecosystem make it an excellent choice. But the patterns that work for small applications don't automatically scale. As your system grows, invest in observability, idempotency, and proper error handling.
The goal isn't to use the most sophisticated pattern—it's to use the right pattern for your current needs, with a clear path to evolve when those needs change.
What did you think?