Backend Job Processing: Worker Pools, Retry Strategies & Priority Queues
Backend Job Processing: Worker Pools, Retry Strategies & Priority Queues
Why Background Jobs?
HTTP requests should complete fast. Any work that takes longer than ~200ms or doesn't need to block the user belongs in a background job: sending emails, processing images, generating reports, syncing external systems, computing analytics.
Synchronous (blocks user) Asynchronous (background job)
┌──────────┐ ┌──────────┐
│ Client │ │ Client │
│ │ │ │
│ POST /pay │──────────────────────┐ │ POST /pay │──────────┐
│ │ │ │ │ │
│ Waiting.. │ │ │ 202 │◄─────────┘ (immediate)
│ Waiting.. │ │ │ Accepted │
│ Waiting.. │ ┌──────────────┐ │ └──────────┘
│ Waiting.. │ │ Charge card │ │
│ Waiting.. │ │ Send email │ │ Background:
│ Waiting.. │ │ Update ledger│ │ ┌──────────────┐
│ Waiting.. │ │ Notify fraud │ │ │ Job Queue │
│ │ │ Update report│ │ │ │
│ 200 OK │◄───└──────────────┘──┘ │ ┌──────────┐│ ┌──────────────┐
│ │ (3-5 seconds) │ │ PayJob ││───►│ Worker Pool │
└──────────┘ │ └──────────┘│ │ │
└──────────────┘ │ Charge card │
User waits 3-5 seconds │ Send email │
Risk of timeout/retry │ Update ledger│
└──────────────┘
Architecture Overview
┌─────────────────────────────────────────────────────────────┐
│ Job System │
│ │
│ ┌──────────┐ ┌───────────────┐ ┌──────────────────┐ │
│ │ Producer │───►│ Job Queue │───►│ Worker Pool │ │
│ │ (API) │ │ │ │ │ │
│ └──────────┘ │ ┌─────────┐ │ │ ┌────────────┐ │ │
│ │ │ Priority│ │ │ │ Worker 1 │ │ │
│ ┌──────────┐ │ │ Queues │ │ │ │ │ │ │
│ │ Cron │───►│ │ │ │ │ ├────────────┤ │ │
│ │ Scheduler│ │ │ high │ │ │ │ Worker 2 │ │ │
│ └──────────┘ │ │ default │ │ │ │ │ │ │
│ │ │ low │ │ │ ├────────────┤ │ │
│ ┌──────────┐ │ └─────────┘ │ │ │ Worker N │ │ │
│ │ Recurring│───►│ │ │ │ │ │ │
│ │ Jobs │ │ ┌─────────┐ │ │ └────────────┘ │ │
│ └──────────┘ │ │ Delayed │ │ │ │ │
│ │ │ Queue │ │ │ ┌────────────┐ │ │
│ │ └─────────┘ │ │ │ Dead Letter│ │ │
│ └───────────────┘ │ │ Queue │ │ │
│ │ └────────────┘ │ │
│ └──────────────────┘ │
│ │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ Job State Machine │ │
│ │ pending → scheduled → running → completed │ │
│ │ │ └────→ failed → retrying │ │
│ │ │ └→ dead │ │
│ └───────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
1. Job Definition & Queue
// ─── Core Types ────────────────────────────────────────
interface Job<T = unknown> {
id: string;
name: string;
data: T;
priority: 'critical' | 'high' | 'default' | 'low';
status: JobStatus;
attempts: number;
maxAttempts: number;
createdAt: number;
scheduledAt: number;
startedAt?: number;
completedAt?: number;
failedAt?: number;
error?: string;
result?: unknown;
backoffStrategy: BackoffStrategy;
timeout: number; // ms
retainFor: number; // ms — how long to keep completed jobs
lockUntil?: number; // timestamp — pessimistic lock for worker
workerId?: string; // which worker is processing this
}
type JobStatus = 'pending' | 'scheduled' | 'running' | 'completed' | 'failed' | 'retrying' | 'dead';
interface BackoffStrategy {
type: 'fixed' | 'exponential' | 'polynomial' | 'custom';
initialDelay: number; // ms
maxDelay: number; // ms
factor?: number; // multiplier for exponential
jitterPercent?: number; // 0-100
}
// ─── Job Queue ─────────────────────────────────────────
class JobQueue {
private queues: Map<string, Job[]> = new Map();
private delayed: Job[] = [];
private handlers = new Map<string, JobHandler>();
private listeners = new Map<string, Set<(event: JobEvent) => void>>();
constructor(private storage: JobStorage) {}
// ── Register Job Handler ─────────────────────────
register<T>(name: string, handler: JobHandler<T>): void {
this.handlers.set(name, handler as JobHandler);
}
// ── Enqueue Job ──────────────────────────────────
async enqueue<T>(
name: string,
data: T,
options: Partial<JobOptions> = {}
): Promise<Job<T>> {
if (!this.handlers.has(name)) {
throw new Error(`No handler registered for job: ${name}`);
}
const job: Job<T> = {
id: this.generateId(),
name,
data,
priority: options.priority || 'default',
status: options.delay ? 'scheduled' : 'pending',
attempts: 0,
maxAttempts: options.maxAttempts || 3,
createdAt: Date.now(),
scheduledAt: options.delay ? Date.now() + options.delay : Date.now(),
backoffStrategy: options.backoff || {
type: 'exponential',
initialDelay: 1000,
maxDelay: 300_000, // 5 min
factor: 2,
jitterPercent: 15,
},
timeout: options.timeout || 30_000,
retainFor: options.retainFor || 7 * 24 * 3600 * 1000, // 7 days
};
await this.storage.save(job);
if (job.status === 'scheduled') {
this.delayed.push(job);
this.delayed.sort((a, b) => a.scheduledAt - b.scheduledAt);
} else {
this.addToQueue(job);
}
this.emit('enqueued', { job });
return job;
}
// ── Schedule Recurring Job ───────────────────────
async scheduleRecurring(
name: string,
cron: string,
data: unknown,
options: Partial<JobOptions> = {}
): Promise<void> {
const nextRun = this.nextCronTime(cron);
const job = await this.enqueue(name, data, {
...options,
delay: nextRun - Date.now(),
});
// After completion, schedule next run
this.on('completed', (event) => {
if (event.job.id === job.id) {
this.scheduleRecurring(name, cron, data, options);
}
});
}
// ── Dequeue (for workers) ────────────────────────
async dequeue(workerId: string): Promise<Job | null> {
// Check delayed queue for due jobs
this.promoteDelayedJobs();
// Try priority queues in order: critical > high > default > low
const priorities: Job['priority'][] = ['critical', 'high', 'default', 'low'];
for (const priority of priorities) {
const queue = this.queues.get(priority);
if (queue && queue.length > 0) {
const job = queue.shift()!;
// Lock the job for this worker
job.status = 'running';
job.startedAt = Date.now();
job.workerId = workerId;
job.lockUntil = Date.now() + job.timeout + 5000; // timeout + grace
job.attempts++;
await this.storage.save(job);
this.emit('started', { job });
return job;
}
}
return null;
}
// ── Complete Job ─────────────────────────────────
async complete(jobId: string, result?: unknown): Promise<void> {
const job = await this.storage.get(jobId);
if (!job) return;
job.status = 'completed';
job.completedAt = Date.now();
job.result = result;
job.lockUntil = undefined;
job.workerId = undefined;
await this.storage.save(job);
this.emit('completed', { job });
// Schedule cleanup after retention period
setTimeout(() => this.storage.delete(jobId), job.retainFor);
}
// ── Fail Job ─────────────────────────────────────
async fail(jobId: string, error: Error): Promise<void> {
const job = await this.storage.get(jobId);
if (!job) return;
job.failedAt = Date.now();
job.error = error.message;
job.lockUntil = undefined;
job.workerId = undefined;
if (job.attempts < job.maxAttempts) {
// Retry with backoff
const delay = this.calculateBackoff(job);
job.status = 'retrying';
job.scheduledAt = Date.now() + delay;
this.delayed.push(job);
this.delayed.sort((a, b) => a.scheduledAt - b.scheduledAt);
this.emit('retrying', { job, delay });
} else {
// Exhausted retries — dead letter
job.status = 'dead';
this.emit('dead', { job });
}
await this.storage.save(job);
}
// ── Backoff Calculation ──────────────────────────
private calculateBackoff(job: Job): number {
const { backoffStrategy: bs, attempts } = job;
let delay: number;
switch (bs.type) {
case 'fixed':
delay = bs.initialDelay;
break;
case 'exponential':
delay = bs.initialDelay * Math.pow(bs.factor || 2, attempts - 1);
break;
case 'polynomial':
delay = bs.initialDelay * Math.pow(attempts, bs.factor || 2);
break;
default:
delay = bs.initialDelay;
}
// Apply jitter
if (bs.jitterPercent) {
const jitter = delay * (bs.jitterPercent / 100);
delay += Math.random() * jitter * 2 - jitter;
}
return Math.min(delay, bs.maxDelay);
}
// ── Delayed Job Promotion ────────────────────────
private promoteDelayedJobs(): void {
const now = Date.now();
while (this.delayed.length > 0 && this.delayed[0].scheduledAt <= now) {
const job = this.delayed.shift()!;
job.status = 'pending';
this.addToQueue(job);
}
}
private addToQueue(job: Job): void {
const priority = job.priority;
if (!this.queues.has(priority)) {
this.queues.set(priority, []);
}
this.queues.get(priority)!.push(job);
}
// Event system
on(event: string, listener: (e: JobEvent) => void): void {
if (!this.listeners.has(event)) this.listeners.set(event, new Set());
this.listeners.get(event)!.add(listener);
}
private emit(event: string, data: JobEvent): void {
for (const listener of this.listeners.get(event) || []) listener(data);
}
private generateId(): string { return `job-${Date.now().toString(36)}-${Math.random().toString(36).slice(2, 8)}`; }
private nextCronTime(cron: string): number { return Date.now() + 60000; /* simplified */ }
}
type JobHandler<T = unknown> = (data: T, context: JobContext) => Promise<unknown>;
interface JobContext { jobId: string; attempt: number; signal: AbortSignal; progress(percent: number): void; }
interface JobOptions { priority: Job['priority']; maxAttempts: number; delay: number; backoff: BackoffStrategy; timeout: number; retainFor: number; }
interface JobEvent { job: Job; delay?: number; }
interface JobStorage { save(job: Job): Promise<void>; get(id: string): Promise<Job | null>; delete(id: string): Promise<void>; }
2. Worker Pool
// ─── Worker Pool with Concurrency Control ──────────────
class WorkerPool {
private workers: Worker[] = [];
private isRunning = false;
private pollInterval: ReturnType<typeof setInterval> | null = null;
constructor(
private queue: JobQueue,
private config: WorkerPoolConfig
) {}
// ── Start Pool ───────────────────────────────────
start(): void {
this.isRunning = true;
for (let i = 0; i < this.config.concurrency; i++) {
const worker = new Worker(
`worker-${i}`,
this.queue,
this.config
);
this.workers.push(worker);
}
// Poll for available work
this.pollInterval = setInterval(
() => this.dispatch(),
this.config.pollIntervalMs
);
console.log(
`Worker pool started: ${this.config.concurrency} workers, ` +
`polling every ${this.config.pollIntervalMs}ms`
);
}
// ── Stop Pool (graceful) ─────────────────────────
async stop(timeoutMs: number = 30_000): Promise<void> {
this.isRunning = false;
if (this.pollInterval) clearInterval(this.pollInterval);
console.log('Shutting down worker pool...');
// Wait for active jobs to finish
const deadline = Date.now() + timeoutMs;
const activeWorkers = this.workers.filter(w => w.isActive());
await Promise.race([
Promise.all(activeWorkers.map(w => w.waitForCompletion())),
new Promise(resolve => setTimeout(resolve, timeoutMs)),
]);
// Force kill any still-running workers
const stillActive = this.workers.filter(w => w.isActive());
for (const worker of stillActive) {
console.warn(`Force cancelling worker ${worker.id}`);
worker.cancel();
}
console.log('Worker pool shutdown complete');
}
// ── Dispatch Jobs to Idle Workers ────────────────
private async dispatch(): Promise<void> {
if (!this.isRunning) return;
const idleWorkers = this.workers.filter(w => !w.isActive());
for (const worker of idleWorkers) {
const job = await this.queue.dequeue(worker.id);
if (job) {
worker.execute(job);
}
}
}
// ── Pool Stats ───────────────────────────────────
getStats(): PoolStats {
const active = this.workers.filter(w => w.isActive()).length;
return {
totalWorkers: this.workers.length,
activeWorkers: active,
idleWorkers: this.workers.length - active,
utilization: active / this.workers.length,
};
}
}
interface WorkerPoolConfig {
concurrency: number;
pollIntervalMs: number;
jobTimeoutMs: number;
gracefulShutdownMs: number;
}
interface PoolStats {
totalWorkers: number;
activeWorkers: number;
idleWorkers: number;
utilization: number;
}
// ─── Individual Worker ─────────────────────────────────
class Worker {
private currentJob: Job | null = null;
private abortController: AbortController | null = null;
private completionPromise: Promise<void> | null = null;
private completionResolve: (() => void) | null = null;
constructor(
public id: string,
private queue: JobQueue,
private config: WorkerPoolConfig
) {}
isActive(): boolean {
return this.currentJob !== null;
}
async execute(job: Job): Promise<void> {
this.currentJob = job;
this.abortController = new AbortController();
this.completionPromise = new Promise(resolve => {
this.completionResolve = resolve;
});
// Set timeout
const timeoutId = setTimeout(() => {
this.abortController?.abort(new Error(`Job ${job.id} timed out after ${job.timeout}ms`));
}, job.timeout);
try {
const handler = (this.queue as any).handlers.get(job.name);
if (!handler) throw new Error(`No handler for job: ${job.name}`);
const context: JobContext = {
jobId: job.id,
attempt: job.attempts,
signal: this.abortController.signal,
progress: (percent: number) => {
console.log(`Job ${job.id}: ${percent}% complete`);
},
};
const result = await handler(job.data, context);
await this.queue.complete(job.id, result);
} catch (err) {
await this.queue.fail(job.id, err as Error);
} finally {
clearTimeout(timeoutId);
this.currentJob = null;
this.abortController = null;
this.completionResolve?.();
}
}
cancel(): void {
this.abortController?.abort(new Error('Worker cancelled'));
}
waitForCompletion(): Promise<void> {
return this.completionPromise || Promise.resolve();
}
}
3. Retry Strategies
// ─── Retry Strategy Implementations ────────────────────
class RetryEngine {
// ── Exponential Backoff with Jitter ──────────────
static exponentialWithJitter(
attempt: number,
baseDelay: number = 1000,
maxDelay: number = 300_000
): number {
// Full jitter (AWS recommendation):
// sleep = random_between(0, min(cap, base * 2^attempt))
const expDelay = Math.min(maxDelay, baseDelay * Math.pow(2, attempt));
return Math.floor(Math.random() * expDelay);
}
// ── Decorrelated Jitter ──────────────────────────
// Better than full jitter — decorrelates successive retries
static decorrelatedJitter(
attempt: number,
prevDelay: number,
baseDelay: number = 1000,
maxDelay: number = 300_000
): number {
// sleep = min(cap, random_between(base, prev_sleep * 3))
const upper = Math.min(maxDelay, prevDelay * 3);
return Math.floor(baseDelay + Math.random() * (upper - baseDelay));
}
// ── Circuit Breaker Retry ────────────────────────
// Stop retrying if failure rate is too high
static shouldRetryWithCircuitBreaker(
recentFailures: number,
recentTotal: number,
threshold: number = 0.5
): boolean {
if (recentTotal < 10) return true; // Not enough data
return recentFailures / recentTotal < threshold;
}
}
// ─── Retry Policy per Job Type ─────────────────────────
const retryPolicies: Record<string, BackoffStrategy> = {
// Email: retry quickly, not many attempts
'send-email': {
type: 'exponential',
initialDelay: 5_000, // 5s
maxDelay: 60_000, // 1 min
factor: 2,
jitterPercent: 20,
},
// Payment: retry with longer delays, many attempts
'process-payment': {
type: 'exponential',
initialDelay: 30_000, // 30s
maxDelay: 3_600_000, // 1 hour
factor: 3,
jitterPercent: 30,
},
// Webhook: respect rate limits, slow backoff
'send-webhook': {
type: 'exponential',
initialDelay: 60_000, // 1 min
maxDelay: 86_400_000, // 24 hours
factor: 4,
jitterPercent: 25,
},
// Data sync: fixed interval retries
'sync-inventory': {
type: 'fixed',
initialDelay: 300_000, // 5 min
maxDelay: 300_000,
jitterPercent: 10,
},
};
Retry Timeline Visualization
Exponential Backoff (base=1s, factor=2)
Attempt 1 Attempt 2 Attempt 3 Attempt 4
│ │ │ │
├─ 1s delay ──┤── 2s delay ────┤───── 4s delay ──────────┤ 8s delay ...
▼ ▼ ▼ ▼
[FAIL] [FAIL] [FAIL] [FAIL]
Exponential Backoff + Full Jitter
Attempt 1 Attempt 2 Attempt 3 Attempt 4
│ │ │ │
├─ 0.7s ─────┤─ 1.3s ────┤──── 2.8s ────────┤── 5.1s ...
▼ ▼ ▼ ▼
[FAIL] [FAIL] [FAIL] [FAIL]
Jitter de-correlates retries from multiple workers
hitting the same downstream service simultaneously:
Without jitter: With jitter:
W1: ──*────*────────*─── W1: ──*───*──────────*──
W2: ──*────*────────*─── W2: ───*────*─────*─────
W3: ──*────*────────*─── W3: ─*──────*──────*────
↑ ↑ ↑ Spread out = less
Thundering herd! pressure on service
4. Dead Letter Queue & Error Handling
// ─── Dead Letter Queue Management ──────────────────────
class DeadLetterQueue {
private deadJobs: Job[] = [];
// ── Receive Dead Job ─────────────────────────────
receive(job: Job): void {
this.deadJobs.push(job);
this.notifyAdmins(job);
}
// ── Inspect Dead Jobs ────────────────────────────
list(filter?: Partial<{ name: string; since: number }>): DeadJobSummary[] {
let jobs = this.deadJobs;
if (filter?.name) {
jobs = jobs.filter(j => j.name === filter.name);
}
if (filter?.since) {
jobs = jobs.filter(j => j.failedAt! >= filter.since);
}
return jobs.map(j => ({
id: j.id,
name: j.name,
error: j.error || 'Unknown',
attempts: j.attempts,
failedAt: new Date(j.failedAt!).toISOString(),
data: JSON.stringify(j.data).slice(0, 200),
}));
}
// ── Retry Dead Job ──────────────────────────────
async retry(
jobId: string,
queue: JobQueue,
overrides?: { maxAttempts?: number; data?: unknown }
): Promise<Job> {
const deadJob = this.deadJobs.find(j => j.id === jobId);
if (!deadJob) throw new Error(`Dead job ${jobId} not found`);
// Re-enqueue with fresh attempts
const newJob = await queue.enqueue(
deadJob.name,
overrides?.data || deadJob.data,
{
priority: deadJob.priority,
maxAttempts: overrides?.maxAttempts || deadJob.maxAttempts,
}
);
// Remove from DLQ
this.deadJobs = this.deadJobs.filter(j => j.id !== jobId);
return newJob;
}
// ── Bulk Retry ───────────────────────────────────
async retryAll(
filter: { name: string },
queue: JobQueue
): Promise<{ retried: number; failed: number }> {
const matching = this.deadJobs.filter(j => j.name === filter.name);
let retried = 0, failed = 0;
for (const job of matching) {
try {
await this.retry(job.id, queue);
retried++;
} catch {
failed++;
}
}
return { retried, failed };
}
// ── Purge Old Dead Jobs ──────────────────────────
purge(olderThanMs: number): number {
const cutoff = Date.now() - olderThanMs;
const before = this.deadJobs.length;
this.deadJobs = this.deadJobs.filter(j => j.failedAt! > cutoff);
return before - this.deadJobs.length;
}
private notifyAdmins(job: Job): void {
console.error(
`[DLQ] Job ${job.name} (${job.id}) exhausted ${job.maxAttempts} ` +
`attempts. Last error: ${job.error}`
);
}
}
interface DeadJobSummary {
id: string;
name: string;
error: string;
attempts: number;
failedAt: string;
data: string;
}
5. Job Scheduling (Cron-like)
// ─── Job Scheduler ─────────────────────────────────────
class JobScheduler {
private schedules: ScheduleEntry[] = [];
private timer: ReturnType<typeof setInterval> | null = null;
constructor(private queue: JobQueue) {}
// ── Register Scheduled Job ───────────────────────
schedule(entry: ScheduleConfig): void {
this.schedules.push({
...entry,
lastRun: null,
nextRun: this.calculateNextRun(entry.cron),
});
}
// ── Start Scheduler ────────────────────────────
start(): void {
// Check every second for due jobs
this.timer = setInterval(() => this.tick(), 1000);
}
stop(): void {
if (this.timer) clearInterval(this.timer);
}
private async tick(): Promise<void> {
const now = Date.now();
for (const schedule of this.schedules) {
if (schedule.nextRun <= now) {
// Ensure only one instance runs (leader election / lock)
if (schedule.unique) {
const acquired = await this.acquireLock(schedule.name, schedule.cron);
if (!acquired) continue;
}
await this.queue.enqueue(schedule.name, schedule.data || {}, {
priority: schedule.priority || 'default',
maxAttempts: schedule.maxAttempts || 3,
});
schedule.lastRun = now;
schedule.nextRun = this.calculateNextRun(schedule.cron);
}
}
}
// ── Simplified Cron Parser ───────────────────────
private calculateNextRun(cron: string): number {
// Simplified: supports "every Xs", "every Xm", "every Xh"
const match = cron.match(/^every (\d+)(s|m|h)$/);
if (match) {
const value = parseInt(match[1]);
const unit = match[2];
const ms = unit === 's' ? value * 1000
: unit === 'm' ? value * 60000
: value * 3600000;
return Date.now() + ms;
}
// Full cron parsing would use a library like 'cron-parser'
return Date.now() + 60000;
}
private async acquireLock(name: string, cron: string): Promise<boolean> {
// In production: use distributed lock (Redis SET NX)
return true;
}
}
interface ScheduleConfig {
name: string;
cron: string;
data?: unknown;
priority?: Job['priority'];
maxAttempts?: number;
unique?: boolean; // Only one instance across cluster
}
interface ScheduleEntry extends ScheduleConfig {
lastRun: number | null;
nextRun: number;
}
6. Redis-Backed Job Queue
// ─── Redis-Backed Reliable Queue ───────────────────────
// Uses Redis sorted sets for priority + scheduling
// BRPOPLPUSH for reliable dequeue with visibility timeout
class RedisJobQueue {
constructor(private redis: RedisClient) {}
// ── Enqueue ──────────────────────────────────────
async enqueue(job: Job): Promise<void> {
const serialized = JSON.stringify(job);
// Store job data
await this.redis.set(`job:${job.id}`, serialized);
if (job.scheduledAt > Date.now()) {
// Delayed: add to scheduled sorted set (score = scheduledAt)
await this.redis.zadd('queue:scheduled', job.scheduledAt, job.id);
} else {
// Ready: add to priority queue (score = priority weight)
const score = this.priorityScore(job.priority, job.createdAt);
await this.redis.zadd('queue:ready', score, job.id);
}
}
// ── Dequeue (Atomic with Lua) ────────────────────
async dequeue(workerId: string): Promise<Job | null> {
// Atomically: pop from ready queue + add to processing set
const luaScript = `
-- Promote scheduled jobs first
local now = tonumber(ARGV[1])
local due = redis.call('ZRANGEBYSCORE', 'queue:scheduled', 0, now, 'LIMIT', 0, 10)
for i, jobId in ipairs(due) do
redis.call('ZREM', 'queue:scheduled', jobId)
local job = redis.call('GET', 'job:' .. jobId)
if job then
local parsed = cjson.decode(job)
local score = tonumber(ARGV[2]) -- default priority score
redis.call('ZADD', 'queue:ready', score, jobId)
end
end
-- Pop highest priority job (lowest score)
local result = redis.call('ZPOPMIN', 'queue:ready', 1)
if #result == 0 then return nil end
local jobId = result[1]
local lockUntil = now + tonumber(ARGV[3]) -- timeout
-- Add to processing set with lock expiry
redis.call('ZADD', 'queue:processing', lockUntil, jobId)
-- Update job status
local job = redis.call('GET', 'job:' .. jobId)
return job
`;
const result = await this.redis.eval(
luaScript,
0,
Date.now().toString(),
'50', // default priority score
'30000' // 30s timeout
);
if (!result) return null;
const job: Job = JSON.parse(result as string);
job.status = 'running';
job.workerId = workerId;
job.startedAt = Date.now();
job.attempts++;
await this.redis.set(`job:${job.id}`, JSON.stringify(job));
return job;
}
// ── Heartbeat / Extend Lock ──────────────────────
async heartbeat(jobId: string, extensionMs: number = 30_000): Promise<void> {
const newLock = Date.now() + extensionMs;
await this.redis.zadd('queue:processing', newLock, jobId);
}
// ── Recover Stale Jobs ───────────────────────────
// Jobs that have been processing longer than their lock
async recoverStaleJobs(): Promise<number> {
const now = Date.now();
// Find jobs whose lock has expired
const stale = await this.redis.zrangebyscore(
'queue:processing',
0,
now
);
let recovered = 0;
for (const jobId of stale) {
const jobData = await this.redis.get(`job:${jobId}`);
if (!jobData) continue;
const job: Job = JSON.parse(jobData);
await this.redis.zrem('queue:processing', jobId);
if (job.attempts < job.maxAttempts) {
// Re-enqueue for retry
job.status = 'retrying';
const score = this.priorityScore(job.priority, job.createdAt);
await this.redis.zadd('queue:ready', score, jobId);
await this.redis.set(`job:${jobId}`, JSON.stringify(job));
} else {
job.status = 'dead';
await this.redis.zadd('queue:dead', Date.now(), jobId);
await this.redis.set(`job:${jobId}`, JSON.stringify(job));
}
recovered++;
}
return recovered;
}
// Priority scoring: lower score = higher priority
private priorityScore(
priority: Job['priority'],
createdAt: number
): number {
const weights: Record<string, number> = {
critical: 0,
high: 1_000_000_000,
default: 2_000_000_000,
low: 3_000_000_000,
};
// Within same priority, FIFO by createdAt
return weights[priority] + (createdAt % 1_000_000_000);
}
}
// Minimal Redis client interface
interface RedisClient {
get(key: string): Promise<string | null>;
set(key: string, value: string): Promise<void>;
zadd(key: string, score: number, member: string): Promise<void>;
zrem(key: string, member: string): Promise<void>;
zrangebyscore(key: string, min: number, max: number): Promise<string[]>;
eval(script: string, numkeys: number, ...args: string[]): Promise<unknown>;
}
7. Job System Observability
// ─── Job Metrics & Monitoring ──────────────────────────
class JobMetrics {
private counters = new Map<string, number>();
private histograms = new Map<string, number[]>();
private gauges = new Map<string, number>();
// ── Track Job Lifecycle ──────────────────────────
onJobEnqueued(job: Job): void {
this.inc(`jobs.enqueued.total`);
this.inc(`jobs.enqueued.${job.name}`);
this.gauge(`queue.size.${job.priority}`, 1);
}
onJobStarted(job: Job): void {
this.inc(`jobs.started.total`);
const waitTime = job.startedAt! - job.createdAt;
this.histogram(`jobs.wait_time.${job.name}`, waitTime);
}
onJobCompleted(job: Job): void {
this.inc(`jobs.completed.total`);
this.inc(`jobs.completed.${job.name}`);
const duration = job.completedAt! - job.startedAt!;
this.histogram(`jobs.duration.${job.name}`, duration);
this.gauge(`queue.size.${job.priority}`, -1);
}
onJobFailed(job: Job): void {
this.inc(`jobs.failed.total`);
this.inc(`jobs.failed.${job.name}`);
if (job.status === 'dead') {
this.inc(`jobs.dead.total`);
this.inc(`jobs.dead.${job.name}`);
}
}
// ── Dashboard Data ───────────────────────────────
getDashboard(): JobDashboard {
const jobNames = new Set<string>();
for (const key of this.counters.keys()) {
const match = key.match(/jobs\.enqueued\.(.+)/);
if (match && match[1] !== 'total') {
jobNames.add(match[1]);
}
}
return {
summary: {
totalEnqueued: this.get('jobs.enqueued.total'),
totalCompleted: this.get('jobs.completed.total'),
totalFailed: this.get('jobs.failed.total'),
totalDead: this.get('jobs.dead.total'),
successRate: this.get('jobs.completed.total') /
Math.max(1, this.get('jobs.completed.total') + this.get('jobs.failed.total')),
},
perJob: Array.from(jobNames).map(name => ({
name,
enqueued: this.get(`jobs.enqueued.${name}`),
completed: this.get(`jobs.completed.${name}`),
failed: this.get(`jobs.failed.${name}`),
dead: this.get(`jobs.dead.${name}`),
avgDuration: this.avg(`jobs.duration.${name}`),
p99Duration: this.percentile(`jobs.duration.${name}`, 0.99),
avgWaitTime: this.avg(`jobs.wait_time.${name}`),
})),
};
}
private inc(key: string): void {
this.counters.set(key, (this.counters.get(key) || 0) + 1);
}
private get(key: string): number { return this.counters.get(key) || 0; }
private gauge(key: string, delta: number): void {
this.gauges.set(key, (this.gauges.get(key) || 0) + delta);
}
private histogram(key: string, value: number): void {
if (!this.histograms.has(key)) this.histograms.set(key, []);
this.histograms.get(key)!.push(value);
}
private avg(key: string): number {
const values = this.histograms.get(key) || [];
return values.length > 0 ? values.reduce((a, b) => a + b) / values.length : 0;
}
private percentile(key: string, p: number): number {
const values = (this.histograms.get(key) || []).sort((a, b) => a - b);
if (values.length === 0) return 0;
const idx = Math.ceil(values.length * p) - 1;
return values[idx];
}
}
interface JobDashboard {
summary: { totalEnqueued: number; totalCompleted: number; totalFailed: number; totalDead: number; successRate: number };
perJob: { name: string; enqueued: number; completed: number; failed: number; dead: number; avgDuration: number; p99Duration: number; avgWaitTime: number }[];
}
Comparison: Job Processing Systems
| System | Language | Queue Backend | Priority | Delayed Jobs | Cron | Rate Limiting | Dashboard |
|---|---|---|---|---|---|---|---|
| Bull/BullMQ | Node.js | Redis | Yes | Yes | Yes | Yes | Bull Board |
| Celery | Python | Redis/RabbitMQ | Yes | Yes | Yes (Beat) | Yes | Flower |
| Sidekiq | Ruby | Redis | Yes | Yes | Yes | Yes | Web UI |
| Temporal | Any | Own DB | N/A | Yes | Yes | N/A | Web UI |
| AWS SQS + Lambda | Any | Managed | No* | Yes | Via EventBridge | Via Lambda | CloudWatch |
| Kafka + Workers | Any | Kafka | Via topics | Via timestamp | External | Consumer lag | Kafka UI |
Comparison: Retry Strategies
| Strategy | Formula | Pros | Cons | Best For |
|---|---|---|---|---|
| Fixed | delay = D | Simple, predictable | Thundering herd risk | Periodic sync tasks |
| Linear | delay = D × attempt | Gentle increase | Still clusters | Low-traffic systems |
| Exponential | delay = D × 2^attempt | Spreads load quickly | Can over-wait | Most cases |
| Exp + Full Jitter | rand(0, D × 2^attempt) | Best spread | Unpredictable delays | High contention |
| Decorrelated Jitter | rand(D, prev × 3) | Best de-correlation | Complex | Multiple clients |
| Circuit Breaker | Stop if failure rate > X% | Protects downstream | Can starve queue | External APIs |
Interview Questions & Answers
Q1: How do you guarantee exactly-once processing in a job system?
A: True exactly-once is impossible in distributed systems, but you can achieve effectively-once via idempotency. The pattern: (1) Assign each job a unique ID at creation. (2) Before processing, check if this job ID has already been processed (idempotency key in a database). (3) Process the job and record the idempotency key in the same transaction. (4) If the worker crashes mid-processing and the job is retried, the idempotency check prevents double-processing. For jobs with side effects (sending email, charging payment), make the side effect itself idempotent — use idempotency keys with the payment provider, use INSERT ... ON CONFLICT DO NOTHING for database operations. Redis-backed queues use BRPOPLPUSH (or ZPOPMIN + ZADD) to atomically move a job from the ready queue to the processing set, ensuring no two workers pick up the same job.
Q2: How do you handle poison pill messages (jobs that always fail)?
A: Poison pills are jobs that crash the worker every time they run, causing an infinite retry loop. Defense: (1) Max attempt limit (e.g., 3-5 retries). After exhaustion, move to dead letter queue. (2) Circuit breaker on the job type — if the same job name fails X times in Y minutes, pause all jobs of that type. (3) Per-job timeout to prevent workers from hanging. (4) Worker isolation — run each job in a sandbox/subprocess so a crash doesn't take down the whole worker pool. (5) Alerting on DLQ growth rate. In BullMQ, the attempts and backoff options handle this. In SQS, the maxReceiveCount and dead letter queue ARN handle this natively.
Q3: What's the difference between a task queue (Celery/Bull) and a workflow engine (Temporal)?
A: Task queues handle independent units of work: enqueue a job, process it, done. Workflows handle multi-step processes with state: step 1 → step 2 → decision → step 3. Specific differences: (1) State management: Task queues are stateless between jobs. Temporal maintains workflow state across steps and can sleep for days/months between steps. (2) Failure handling: Task queues retry individual jobs. Temporal can replay entire workflow histories to recover from any point. (3) Orchestration: Task queues need external coordination (saga pattern) for multi-step processes. Temporal handles this natively with workflow.execute(). (4) Complexity: Task queues have minimal overhead. Temporal has a steeper learning curve and operational cost. Use task queues for independent operations (send email, resize image). Use Temporal for multi-step business processes (order fulfillment, user onboarding, data pipelines).
Q4: How do you handle job queue backpressure?
A: When jobs enqueue faster than workers process them: (1) Monitor queue depth — alert when queue size exceeds threshold. (2) Auto-scaling workers — increase concurrency based on queue depth. Cloud platforms (AWS ECS, K8s HPA) can scale on custom metrics like queue size. (3) Rate limit producers — reject or throttle new job submissions when queue is too deep. Return 429 or 503 and let the client retry. (4) Priority-based shedding — when overloaded, drop low-priority jobs or defer them. Process only critical/high priority. (5) Backpressure signal — producers check queue depth before enqueuing and slow down voluntarily. (6) Separate queues per priority — high-priority jobs aren't blocked behind a backlog of low-priority work. BullMQ supports rate limiting via limiter: { max: 100, duration: 1000 }.
Q5: How do you ensure graceful shutdown of a worker pool?
A: Graceful shutdown means finishing in-progress jobs without starting new ones. The process: (1) Signal handling: Catch SIGTERM/SIGINT. Stop polling for new jobs immediately. (2) Drain phase: Wait for currently-running jobs to complete, with a configurable timeout (e.g., 30s). (3) Heartbeat extension: Extend the visibility timeout / lock on in-progress jobs so another worker doesn't pick them up during shutdown. (4) Force kill: After the grace period, abort remaining jobs. These will be picked up by another worker when their lock expires. (5) Connection cleanup: Close Redis/DB connections cleanly. In Kubernetes: the preStop hook + terminationGracePeriodSeconds gives the pod time to drain. The readiness probe should immediately fail so no new traffic arrives.
Real-World Problems & How to Solve Them
Problem 1: Duplicate side effects after a worker crash
Symptom: An order is charged twice or a user receives duplicate emails after retries.
Root cause: The job lock (lockUntil) expires and another worker dequeues the same job while the first attempt already executed an external side effect. Without idempotency, at-least-once delivery becomes duplicate execution.
Fix — Add a durable idempotency gate per business action:
interface PaymentJobData {
orderId: string;
amountCents: number;
idempotencyKey: string;
}
interface IdempotencyStore {
begin(key: string, ttlMs: number): Promise<boolean>;
complete(key: string): Promise<void>;
fail(key: string, reason: string): Promise<void>;
}
async function processPaymentJob(
job: Job<PaymentJobData>,
deps: {
gateway: { charge(orderId: string, amountCents: number): Promise<void> };
orders: { markPaid(orderId: string): Promise<void> };
idempotency: IdempotencyStore;
},
): Promise<void> {
const key = `payment:${job.data.idempotencyKey}`;
const firstExecution = await deps.idempotency.begin(key, 24 * 60 * 60 * 1000);
if (!firstExecution) return;
try {
await deps.gateway.charge(job.data.orderId, job.data.amountCents);
await deps.orders.markPaid(job.data.orderId);
await deps.idempotency.complete(key);
} catch (error) {
await deps.idempotency.fail(key, String(error));
throw error;
}
}
Problem 2: Retry storms overload downstream services
Symptom: After a short outage, thousands of jobs retry at once and cause another outage.
Root cause: Retries use synchronized delays. If all failures happen together, they all retry together. Backoff without jitter still creates herd behavior.
Fix — Use exponential backoff with jitter and max delay caps:
function nextRetryDelayMs(job: Job, nowAttempt: number): number {
const backoff = job.backoffStrategy;
const factor = backoff.factor ?? 2;
const base = Math.min(
backoff.maxDelay,
backoff.initialDelay * Math.pow(factor, Math.max(0, nowAttempt - 1)),
);
const jitterPct = (backoff.jitterPercent ?? 0) / 100;
const jitter = base * jitterPct * (Math.random() * 2 - 1); // +/- jitter
return Math.max(250, Math.round(base + jitter));
}
async function scheduleRetry(queue: JobQueue, job: Job): Promise<void> {
const delay = nextRetryDelayMs(job, job.attempts + 1);
await queue.enqueue(job.name, job.data, {
priority: job.priority,
delay,
maxAttempts: job.maxAttempts,
backoff: job.backoffStrategy,
});
}
Problem 3: Low-priority jobs never finish
Symptom: Critical jobs are healthy, but low/default jobs stay in queue for hours.
Root cause: Strict dequeue order (critical -> high -> default -> low) can starve lower tiers during sustained high-priority load.
Fix — Switch to weighted fairness instead of strict priority-only dequeue:
type Priority = Job["priority"];
class WeightedPriorityPicker {
private ring: Priority[] = [
"critical", "critical", "high", "high", "default", "default", "low",
];
private index = 0;
next(): Priority {
const value = this.ring[this.index];
this.index = (this.index + 1) % this.ring.length;
return value;
}
}
function dequeueFair(queues: Map<Priority, Job[]>, picker: WeightedPriorityPicker): Job | null {
for (let i = 0; i < 8; i++) {
const level = picker.next();
const queue = queues.get(level);
if (queue && queue.length > 0) return queue.shift()!;
}
return null;
}
Problem 4: Scheduled jobs are delayed indefinitely
Symptom: Jobs with delay run minutes late unless workers are constantly polling.
Root cause: Delayed jobs are promoted only during dequeue paths. If workers are idle/restarting, scheduled jobs remain in delayed storage longer than expected.
Fix — Run a dedicated delayed-job promoter loop:
interface DelayAwareQueue {
promoteDueJobs(now: number): Promise<number>;
}
class DelayedJobPromoter {
private timer: NodeJS.Timeout | null = null;
constructor(private queue: DelayAwareQueue, private intervalMs = 250) {}
start(): void {
this.timer = setInterval(() => {
void this.queue.promoteDueJobs(Date.now());
}, this.intervalMs);
this.timer.unref?.();
}
stop(): void {
if (this.timer) clearInterval(this.timer);
this.timer = null;
}
}
Problem 5: Recurring jobs duplicate after deploys
Symptom: A cron-like job runs 2-3 times for the same schedule window after restarts.
Root cause: Recurrence logic can re-register scheduling callbacks and enqueue duplicates if not guarded by a deterministic key and lock.
Fix — Use a unique schedule key + distributed lock before enqueue:
interface LockStore {
acquire(key: string, ttlMs: number): Promise<boolean>;
release(key: string): Promise<void>;
}
async function enqueueRecurringSafely(
queue: JobQueue,
lockStore: LockStore,
jobName: string,
cronWindowEpochMs: number,
payload: unknown,
): Promise<void> {
const windowKey = `${jobName}:${cronWindowEpochMs}`;
const lockKey = `recurring-lock:${windowKey}`;
const gotLock = await lockStore.acquire(lockKey, 30_000);
if (!gotLock) return;
try {
await queue.enqueue(jobName, payload, {
priority: "default",
maxAttempts: 3,
});
} finally {
await lockStore.release(lockKey);
}
}
Problem 6: Deployments drop in-flight jobs
Symptom: During rolling deploys, some jobs fail with lock timeouts or disappear until manual retry.
Root cause: Workers exit on SIGTERM before finishing or extending locks for running jobs.
Fix — Drain workers gracefully with a shutdown deadline:
class WorkerRuntime {
private stopping = false;
private inFlight = new Set<Promise<void>>();
async runLoop(work: () => Promise<void>): Promise<void> {
while (!this.stopping) {
const task = work().finally(() => this.inFlight.delete(task));
this.inFlight.add(task);
await task;
}
}
async shutdown(graceMs: number): Promise<void> {
this.stopping = true;
const drain = Promise.allSettled([...this.inFlight]);
const timeout = new Promise<"timeout">((resolve) =>
setTimeout(() => resolve("timeout"), graceMs),
);
await Promise.race([drain, timeout]);
}
}
const runtime = new WorkerRuntime();
process.on("SIGTERM", () => {
void runtime.shutdown(30_000).finally(() => process.exit(0));
});
Key Takeaways
- Background jobs decouple work from request latency — anything >200ms or non-blocking belongs in a job queue
- Priority queues prevent starvation — critical jobs (payment processing) shouldn't wait behind bulk operations (report generation)
- Exponential backoff with jitter is the recommended retry strategy — it prevents thundering herd on downstream services
- Dead letter queues are essential for observability — jobs that exhaust retries must be inspectable and retryable
- Visibility timeout / lock expiry prevents duplicate processing — if a worker crashes, the job automatically returns to the queue
- Idempotency is your exactly-once guarantee — design every job handler to be safely re-executed
- Redis sorted sets enable both priority queues and delayed/scheduled jobs with a single data structure
- Graceful shutdown (drain → timeout → force) prevents data loss during deployments
- Per-job metrics (wait time, processing time, failure rate) are critical for capacity planning and SLA monitoring
- Job systems are not workflow engines — for multi-step orchestration with long-running state, use Temporal or Step Functions
What did you think?