Advanced Concurrency Patterns: Implementing Promise.scheduler, Semaphore, and ReadWriteLock in JavaScript
Advanced Concurrency Patterns: Implementing Promise.scheduler, Semaphore, and ReadWriteLock in JavaScript
The Problem
JavaScript is single-threaded but concurrency bugs are everywhere. async/await makes asynchronous code look sequential, hiding the fact that interleaving still happens at every await boundary. Rate-limited APIs need a Semaphore. Shared IndexedDB access needs a ReadWriteLock. Parallel stream synchronization needs a Barrier. Cross-worker shared memory needs a Mutex built on Atomics. Understanding these patterns at the computer science level — not just the syntax level — is what separates senior engineers from staff-level architects.
The Concurrency Model: Why Single-Threaded Doesn't Mean Safe
MYTH: "JavaScript is single-threaded, so I don't have concurrency bugs."
REALITY: Every `await` is a YIELD POINT where other code can run.
async function transferMoney(from, to, amount) {
const balance = await getBalance(from); // ← yield point
// Other code runs here!
// Another transfer could
// read the SAME balance.
if (balance >= amount) {
await setBalance(from, balance - amount); // ← yield point
await setBalance(to, (await getBalance(to)) + amount);
}
}
// Two concurrent calls:
transferMoney(A, B, 100); // reads balance: $150
transferMoney(A, C, 100); // reads balance: $150 (SAME!)
// Both succeed → $300 debited from account with $150
// Classic TOCTOU (Time-of-check to time-of-use) bug.
CONCURRENCY MODEL:
──────────────────
┌──────────────────────────────────────────┐
Macrotask Queue: │ task1 │ task2 │ task3 │ ... │
└──────────────────────────────────────────┘
↓
┌──────────────────────────────────────────┐
Microtask Queue: │ .then() │ .then() │ queueMicrotask │... │
└──────────────────────────────────────────┘
↓
Each task runs to completion (until the next await).
Between tasks/microtasks, OTHER async operations can interleave.
JavaScript's concurrency is COOPERATIVE (not preemptive):
- Code runs uninterrupted between yield points
- Yield points: await, setTimeout callbacks, event handlers
- No data races on primitives (single thread)
- But LOGICAL races happen at every await boundary
Mutex: Mutual Exclusion for Async Code
// A Mutex ensures only ONE async operation accesses a
// critical section at a time. Others queue up and wait.
class Mutex {
private locked = false;
private waitQueue: Array<() => void> = [];
async acquire(): Promise<void> {
if (!this.locked) {
this.locked = true;
return;
}
// Already locked — wait in queue:
return new Promise<void>((resolve) => {
this.waitQueue.push(resolve);
});
}
release(): void {
if (this.waitQueue.length > 0) {
// Wake up the next waiter (FIFO fairness):
const next = this.waitQueue.shift()!;
// Don't set locked = false; transfer ownership directly.
next();
} else {
this.locked = false;
}
}
// RAII-style: ensure release even if body throws:
async withLock<T>(fn: () => Promise<T>): Promise<T> {
await this.acquire();
try {
return await fn();
} finally {
this.release();
}
}
}
// Fixed transfer:
const accountMutex = new Mutex();
async function transferMoney(from: string, to: string, amount: number) {
await accountMutex.withLock(async () => {
const balance = await getBalance(from);
if (balance >= amount) {
await setBalance(from, balance - amount);
await setBalance(to, (await getBalance(to)) + amount);
}
});
}
// Now: second transfer waits for the first to complete.
// No interleaving inside the critical section.
Semaphore: Limiting Parallel Async Operations
// A Semaphore is a Mutex generalized to N concurrent holders.
// Use case: Rate-limited APIs that allow at most N concurrent requests.
class Semaphore {
private permits: number;
private waitQueue: Array<() => void> = [];
constructor(private readonly maxPermits: number) {
this.permits = maxPermits;
}
async acquire(): Promise<void> {
if (this.permits > 0) {
this.permits--;
return;
}
return new Promise<void>((resolve) => {
this.waitQueue.push(resolve);
});
}
release(): void {
if (this.waitQueue.length > 0) {
const next = this.waitQueue.shift()!;
// Transfer permit directly to next waiter:
next();
} else {
this.permits++;
if (this.permits > this.maxPermits) {
throw new Error("Semaphore released more times than acquired");
}
}
}
async withPermit<T>(fn: () => Promise<T>): Promise<T> {
await this.acquire();
try {
return await fn();
} finally {
this.release();
}
}
get available(): number {
return this.permits;
}
get waiting(): number {
return this.waitQueue.length;
}
}
// Usage: Rate-limited API client
const apiSemaphore = new Semaphore(5); // Max 5 concurrent requests
async function fetchWithRateLimit(url: string): Promise<Response> {
return apiSemaphore.withPermit(() => fetch(url));
}
// Process 1000 items with at most 5 concurrent API calls:
async function processAll(items: string[]): Promise<void> {
await Promise.all(
items.map((item) =>
apiSemaphore.withPermit(async () => {
const res = await fetch(`/api/process/${item}`);
return res.json();
})
)
);
}
// Without the semaphore: 1000 simultaneous requests → rate limit errors.
// With semaphore: at most 5 at a time, others queue up.
Weighted Semaphore
// Some operations are "heavier" than others.
// A file upload might consume 3 permits while a metadata fetch uses 1.
class WeightedSemaphore {
private permits: number;
private waitQueue: Array<{ weight: number; resolve: () => void }> = [];
constructor(private readonly maxPermits: number) {
this.permits = maxPermits;
}
async acquire(weight: number = 1): Promise<void> {
if (weight > this.maxPermits) {
throw new Error(`Weight ${weight} exceeds max permits ${this.maxPermits}`);
}
if (this.permits >= weight) {
this.permits -= weight;
return;
}
return new Promise<void>((resolve) => {
this.waitQueue.push({ weight, resolve });
});
}
release(weight: number = 1): void {
this.permits += weight;
// Try to wake up waiters that fit in available permits:
while (this.waitQueue.length > 0) {
const next = this.waitQueue[0];
if (this.permits >= next.weight) {
this.waitQueue.shift();
this.permits -= next.weight;
next.resolve();
} else {
break; // Not enough permits for the next waiter
}
}
}
}
// Usage:
const bandwidth = new WeightedSemaphore(10);
async function uploadFile(file: File) {
const weight = file.size > 10_000_000 ? 5 : 1; // Large files use more bandwidth
await bandwidth.acquire(weight);
try {
await uploadToServer(file);
} finally {
bandwidth.release(weight);
}
}
ReadWriteLock: Shared Resource Access for IndexedDB
// Multiple readers can proceed simultaneously.
// A writer requires exclusive access — blocks all readers and other writers.
//
// READER WRITER
// ┌──────────┐ ┌──────────────┐
// │ Read #1 │ ←──┐ │ │
// │ Read #2 │ ←──┤ │ WAITS... │
// │ Read #3 │ ←──┘ │ │
// └──────────┘ │ │
// All finish │ │
// │ Write (excl) │
// └──────────────┘
class ReadWriteLock {
private readers = 0;
private writer = false;
private readQueue: Array<() => void> = [];
private writeQueue: Array<() => void> = [];
async acquireRead(): Promise<void> {
// Can read if: no active writer AND no waiting writers
// (writer preference to prevent starvation)
if (!this.writer && this.writeQueue.length === 0) {
this.readers++;
return;
}
return new Promise<void>((resolve) => {
this.readQueue.push(resolve);
});
}
releaseRead(): void {
this.readers--;
// If no more readers, wake up a waiting writer:
if (this.readers === 0 && this.writeQueue.length > 0) {
this.writer = true;
const next = this.writeQueue.shift()!;
next();
}
}
async acquireWrite(): Promise<void> {
if (!this.writer && this.readers === 0) {
this.writer = true;
return;
}
return new Promise<void>((resolve) => {
this.writeQueue.push(resolve);
});
}
releaseWrite(): void {
this.writer = false;
// Prefer waking all queued readers (they can run concurrently):
if (this.readQueue.length > 0) {
const readers = [...this.readQueue];
this.readQueue = [];
this.readers = readers.length;
readers.forEach((resolve) => resolve());
} else if (this.writeQueue.length > 0) {
this.writer = true;
const next = this.writeQueue.shift()!;
next();
}
}
async withRead<T>(fn: () => Promise<T>): Promise<T> {
await this.acquireRead();
try {
return await fn();
} finally {
this.releaseRead();
}
}
async withWrite<T>(fn: () => Promise<T>): Promise<T> {
await this.acquireWrite();
try {
return await fn();
} finally {
this.releaseWrite();
}
}
}
// Usage: IndexedDB cache with concurrent reads, exclusive writes
const cacheLock = new ReadWriteLock();
async function readFromCache(key: string): Promise<unknown> {
return cacheLock.withRead(async () => {
const db = await openDB();
const tx = db.transaction("cache", "readonly");
return tx.objectStore("cache").get(key);
});
}
async function writeToCache(key: string, value: unknown): Promise<void> {
return cacheLock.withWrite(async () => {
const db = await openDB();
const tx = db.transaction("cache", "readwrite");
tx.objectStore("cache").put(value, key);
await tx.done;
});
}
// 10 concurrent reads → all proceed simultaneously.
// 1 write → waits for all reads to finish, then runs exclusively.
Barrier: Synchronizing Multiple Async Streams
// A Barrier blocks N async operations until ALL of them arrive.
// Then all proceed simultaneously.
//
// Stream A: ──────▶ arrive ──── WAIT ────▶ proceed
// Stream B: ──▶ arrive ──────── WAIT ────▶ proceed
// Stream C: ────────────▶ arrive ────────▶ proceed (last one, all unblock)
class Barrier {
private count: number;
private waiting: Array<() => void> = [];
private generation = 0; // Reset tracking
constructor(private readonly parties: number) {
this.count = parties;
}
async arrive(): Promise<void> {
const gen = this.generation;
this.count--;
if (this.count === 0) {
// Last arrival — release everyone:
this.generation++;
this.count = this.parties; // Reset for reuse
const waiters = [...this.waiting];
this.waiting = [];
waiters.forEach((resolve) => resolve());
return; // Last arriver doesn't wait
}
// Not everyone has arrived — wait:
return new Promise<void>((resolve) => {
this.waiting.push(resolve);
});
}
}
// Usage: Load all critical resources before rendering dashboard
async function loadDashboard() {
const barrier = new Barrier(3);
const results: Record<string, unknown> = {};
// Three parallel data streams:
const loadUser = async () => {
results.user = await fetch("/api/user").then((r) => r.json());
await barrier.arrive(); // Wait for others
console.log("User loaded, all resources ready");
};
const loadMetrics = async () => {
results.metrics = await fetch("/api/metrics").then((r) => r.json());
await barrier.arrive();
console.log("Metrics loaded, all resources ready");
};
const loadConfig = async () => {
results.config = await fetch("/api/config").then((r) => r.json());
await barrier.arrive();
console.log("Config loaded, all resources ready");
};
await Promise.all([loadUser(), loadMetrics(), loadConfig()]);
// All three logs appear at the same time — when the SLOWEST finishes.
// Then render the dashboard with all data guaranteed available.
renderDashboard(results);
}
CancellablePromise with Teardown Propagation
// A Promise you can cancel from the outside, with proper cleanup.
class CancellablePromise<T> {
public readonly promise: Promise<T>;
private abortController = new AbortController();
private teardowns: Array<() => void> = [];
constructor(
executor: (
resolve: (value: T) => void,
reject: (reason: unknown) => void,
signal: AbortSignal,
onTeardown: (fn: () => void) => void
) => void
) {
this.promise = new Promise<T>((resolve, reject) => {
// If already cancelled before construction finishing:
if (this.abortController.signal.aborted) {
reject(new CancelledError("Operation cancelled"));
return;
}
// Listen for cancellation:
this.abortController.signal.addEventListener("abort", () => {
reject(new CancelledError("Operation cancelled"));
this.runTeardowns();
});
executor(
(value) => {
if (!this.abortController.signal.aborted) resolve(value);
},
(reason) => {
if (!this.abortController.signal.aborted) reject(reason);
},
this.abortController.signal,
(fn) => this.teardowns.push(fn)
);
});
}
cancel(): void {
this.abortController.abort();
}
get signal(): AbortSignal {
return this.abortController.signal;
}
private runTeardowns(): void {
for (const fn of this.teardowns) {
try { fn(); } catch {}
}
this.teardowns = [];
}
// Chain: propagate cancellation to child promises
then<U>(
onFulfilled: (value: T) => U | Promise<U>
): CancellablePromise<U> {
const child = new CancellablePromise<U>((resolve, reject, signal) => {
this.promise.then(
(value) => {
if (signal.aborted) return;
try {
const result = onFulfilled(value);
if (result instanceof Promise) {
result.then(resolve, reject);
} else {
resolve(result);
}
} catch (e) {
reject(e);
}
},
reject
);
});
// Cancelling child also cancels parent:
// (Structured concurrency: parent outlives children)
return child;
}
}
class CancelledError extends Error {
readonly cancelled = true;
constructor(message = "Cancelled") {
super(message);
this.name = "CancelledError";
}
}
// Usage:
const operation = new CancellablePromise((resolve, reject, signal, onTeardown) => {
const timer = setTimeout(() => {
resolve(fetch("/api/data", { signal }).then((r) => r.json()));
}, 1000);
// Register cleanup:
onTeardown(() => clearTimeout(timer));
});
// Later:
operation.cancel();
// → Timer cleared, fetch aborted, promise rejected with CancelledError.
Mutex on SharedArrayBuffer with Atomics
// True cross-worker mutual exclusion using shared memory.
// This is REAL locking — not cooperative async scheduling.
// Main Thread Worker A Worker B
// ┌────────┐ ┌────────┐ ┌────────┐
// │ Create │ │ │ │ │
// │ SAB │──share──▶ │ lock() │ │ lock() │
// │ │──share──▶ │ ... │ │ SPINS │
// │ │ │unlock()│ │ ... │
// │ │ │ │ │ (wakes)│
// │ │ │ │ │ lock() │
// └────────┘ └────────┘ └────────┘
class AtomicMutex {
// Uses a single Int32 in shared memory:
// 0 = unlocked, 1 = locked
private lockIndex: number;
constructor(
private buffer: SharedArrayBuffer,
private view: Int32Array,
lockOffset: number = 0
) {
this.lockIndex = lockOffset;
}
static create(lockOffset: number = 0): {
mutex: AtomicMutex;
buffer: SharedArrayBuffer;
} {
const buffer = new SharedArrayBuffer(4);
const view = new Int32Array(buffer);
return {
mutex: new AtomicMutex(buffer, view, lockOffset),
buffer,
};
}
static fromBuffer(
buffer: SharedArrayBuffer,
lockOffset: number = 0
): AtomicMutex {
const view = new Int32Array(buffer);
return new AtomicMutex(buffer, view, lockOffset);
}
lock(): void {
while (true) {
// Try to atomically set 0 → 1:
// compareExchange returns the OLD value.
// If it was 0, we successfully locked it.
const old = Atomics.compareExchange(this.view, this.lockIndex, 0, 1);
if (old === 0) {
// We got the lock!
return;
}
// Lock is held by someone else. Wait until notified:
// Atomics.wait blocks the thread (NOT the event loop).
// This is why it only works in Workers, not the main thread.
Atomics.wait(this.view, this.lockIndex, 1);
// When we wake up, try again (another waiter may have grabbed it).
}
}
unlock(): void {
// Set back to 0 (unlocked):
const old = Atomics.compareExchange(this.view, this.lockIndex, 1, 0);
if (old !== 1) {
throw new Error("Mutex was not locked");
}
// Wake up ONE waiting thread:
Atomics.notify(this.view, this.lockIndex, 1);
}
tryLock(): boolean {
// Non-blocking attempt:
return Atomics.compareExchange(this.view, this.lockIndex, 0, 1) === 0;
}
}
// Worker code:
// const mutex = AtomicMutex.fromBuffer(sharedBuffer, 0);
// mutex.lock();
// try {
// // Critical section — only one worker runs this at a time
// // Read/write shared data in the SharedArrayBuffer
// } finally {
// mutex.unlock();
// }
Structured Concurrency: CSP-Inspired Patterns
// Communicating Sequential Processes (CSP):
// Instead of sharing memory, pass messages through CHANNELS.
// Go uses goroutines + channels. We can build the same pattern.
class Channel<T> {
private buffer: T[] = [];
private senders: Array<{ value: T; resolve: () => void }> = [];
private receivers: Array<{ resolve: (value: T) => void }> = [];
private closed = false;
private readonly capacity: number;
constructor(capacity: number = 0) {
this.capacity = capacity; // 0 = unbuffered (rendezvous)
}
async send(value: T): Promise<void> {
if (this.closed) throw new Error("Channel is closed");
// If a receiver is waiting, hand off directly:
if (this.receivers.length > 0) {
const receiver = this.receivers.shift()!;
receiver.resolve(value);
return;
}
// If buffer has space, buffer it:
if (this.buffer.length < this.capacity) {
this.buffer.push(value);
return;
}
// Otherwise, block until a receiver arrives:
return new Promise<void>((resolve) => {
this.senders.push({ value, resolve });
});
}
async receive(): Promise<T | undefined> {
// If buffer has data, return immediately:
if (this.buffer.length > 0) {
const value = this.buffer.shift()!;
// If a sender is waiting, move their value into buffer:
if (this.senders.length > 0) {
const sender = this.senders.shift()!;
this.buffer.push(sender.value);
sender.resolve();
}
return value;
}
// If a sender is waiting (unbuffered channel), hand off:
if (this.senders.length > 0) {
const sender = this.senders.shift()!;
sender.resolve();
return sender.value;
}
// If channel is closed, return undefined:
if (this.closed) return undefined;
// Block until a sender arrives:
return new Promise<T | undefined>((resolve) => {
this.receivers.push({ resolve: resolve as (value: T) => void });
});
}
close(): void {
this.closed = true;
// Wake up all receivers with undefined:
for (const receiver of this.receivers) {
receiver.resolve(undefined as any);
}
this.receivers = [];
}
// Async iterator support:
async *[Symbol.asyncIterator](): AsyncGenerator<T> {
while (true) {
const value = await this.receive();
if (value === undefined && this.closed) return;
yield value as T;
}
}
}
// Usage: Producer-consumer with backpressure
async function pipeline() {
const urls = new Channel<string>(10); // Buffered: up to 10 queued
const results = new Channel<object>(5);
// Producer: discovers URLs
const producer = async () => {
for (let page = 1; page <= 100; page++) {
await urls.send(`/api/items?page=${page}`);
// If buffer is full, this BLOCKS until a consumer reads.
// Natural backpressure — no unbounded queue growth.
}
urls.close();
};
// Consumers: fetch URLs (3 concurrent)
const consumer = async () => {
for await (const url of urls) {
const data = await fetch(url).then((r) => r.json());
await results.send(data);
}
};
// Aggregator: collects results
const aggregator = async () => {
const all: object[] = [];
for await (const result of results) {
all.push(result);
if (all.length >= 100) break;
}
return all;
};
// Run pipeline:
const [, , , aggregated] = await Promise.all([
producer(),
consumer(),
consumer(),
consumer(),
aggregator(),
]);
}
Promise.scheduler: Priority Task Scheduling
// A scheduler that runs async tasks with priority ordering
// and concurrency limits.
type Priority = "critical" | "high" | "normal" | "low" | "idle";
interface ScheduledTask<T> {
fn: () => Promise<T>;
priority: Priority;
resolve: (value: T) => void;
reject: (reason: unknown) => void;
signal?: AbortSignal;
}
class PromiseScheduler {
private queues: Record<Priority, ScheduledTask<any>[]> = {
critical: [],
high: [],
normal: [],
low: [],
idle: [],
};
private running = 0;
private readonly maxConcurrency: number;
private readonly priorityOrder: Priority[] = [
"critical", "high", "normal", "low", "idle"
];
constructor(maxConcurrency: number = 4) {
this.maxConcurrency = maxConcurrency;
}
schedule<T>(
fn: () => Promise<T>,
priority: Priority = "normal",
signal?: AbortSignal
): Promise<T> {
return new Promise<T>((resolve, reject) => {
if (signal?.aborted) {
reject(new Error("Aborted"));
return;
}
const task: ScheduledTask<T> = { fn, priority, resolve, reject, signal };
this.queues[priority].push(task);
signal?.addEventListener("abort", () => {
// Remove from queue if not yet started:
const queue = this.queues[priority];
const idx = queue.indexOf(task);
if (idx >= 0) {
queue.splice(idx, 1);
reject(new Error("Aborted"));
}
});
this.drain();
});
}
private drain(): void {
while (this.running < this.maxConcurrency) {
const task = this.pickNext();
if (!task) break;
this.running++;
task
.fn()
.then(task.resolve, task.reject)
.finally(() => {
this.running--;
this.drain(); // Check for more work
});
}
}
private pickNext(): ScheduledTask<any> | null {
for (const priority of this.priorityOrder) {
const queue = this.queues[priority];
while (queue.length > 0) {
const task = queue.shift()!;
// Skip cancelled tasks:
if (task.signal?.aborted) {
task.reject(new Error("Aborted"));
continue;
}
return task;
}
}
return null;
}
}
// Usage:
const scheduler = new PromiseScheduler(3);
// Critical: auth token refresh
scheduler.schedule(() => refreshToken(), "critical");
// High: data fetch for current view
scheduler.schedule(() => fetchDashboardData(), "high");
// Normal: prefetch next page
scheduler.schedule(() => prefetchNextPage(), "normal");
// Low: analytics beacon
scheduler.schedule(() => sendAnalytics(), "low");
// Idle: cache warming
scheduler.schedule(() => warmCache(), "idle");
Interview Q&A
Q: How can you have concurrency bugs in single-threaded JavaScript?
A: Every await is a yield point where other async operations can interleave. Between an await that reads data and an await that writes data, another async function can read the same stale value — a classic TOCTOU (time-of-check-to-time-of-use) race. JavaScript's concurrency is cooperative, not preemptive: code runs uninterrupted between yield points, but logical races happen at every await boundary. This is why you need Mutexes for exclusive access to shared async state, even in a single-threaded environment.
Q: What's the difference between a Mutex, Semaphore, and ReadWriteLock? A: A Mutex allows exactly 1 holder — mutual exclusion for a critical section. A Semaphore allows N holders — used for rate limiting (e.g., max 5 concurrent API calls). A ReadWriteLock distinguishes between readers and writers: multiple readers can proceed simultaneously (shared access), but a writer requires exclusive access (blocks all readers and other writers). Choose Mutex for simple critical sections, Semaphore for concurrency control, and ReadWriteLock when reads vastly outnumber writes and read-read parallelism matters.
Q: Why can't Atomics.wait() work on the main thread?
A: Atomics.wait() blocks the calling thread — it literally pauses execution until the condition is met. Blocking the main thread would freeze the UI, prevent event handling, and violate the browser's responsiveness contract. The main thread must remain available to process events, run animations, and respond to user input. Workers can block because they don't own the UI. On the main thread, use Atomics.waitAsync() (returns a Promise) or cooperative locking patterns that await instead of blocking.
Q: Explain structured concurrency and why it matters. A: Structured concurrency ensures that child async operations never outlive their parent scope — when a scope exits, all operations spawned within it are cancelled and cleaned up. Without it, fire-and-forget promises can leak, continue running after navigation, or hold resources indefinitely. CSP (Communicating Sequential Processes) is the formal model: instead of sharing mutable state, concurrent processes communicate through channels with backpressure. Go's goroutines+channels implement CSP. In JavaScript, we build it with Channel classes + async iterators. The key benefit: bounded resource usage and predictable cleanup, eliminating an entire class of async memory leaks.
What did you think?