Adaptive Load Shedding & Backpressure: Protecting Systems Under Stress
Adaptive Load Shedding & Backpressure: Protecting Systems Under Stress
Load shedding and backpressure are complementary techniques for handling system overload. Load shedding deliberately drops requests when capacity is exceeded, while backpressure propagates slowdown signals upstream to reduce incoming load. Together, they prevent cascading failures, protect downstream dependencies, and maintain service quality for admitted requests. Understanding their internals is essential for building systems that degrade gracefully under stress.
Why This Matters in Real-World Projects
Every production system eventually faces overload conditions:
- Traffic Spikes: Viral content, marketing campaigns, or breaking news can multiply traffic 10-100x in minutes
- Dependency Failures: When a database or downstream service slows down, requests pile up, consuming memory and threads
- Cascading Failures: One overloaded service causes timeouts in callers, which then overload, creating system-wide collapse
- Resource Exhaustion: Without load shedding, systems consume all memory, file descriptors, or connections before crashing
- Quality vs Quantity: Serving 1000 requests slowly is often worse than serving 500 requests quickly and rejecting 500
The Overload Death Spiral
Understanding how systems fail under load reveals why load shedding is essential:
┌────────────────────────────────────────────────────────────────┐
│ The Overload Death Spiral │
├────────────────────────────────────────────────────────────────┤
│ │
│ Normal Load │
│ ─────────────────────────────────────────────────────────── │
│ Requests: 1000/s, Latency: 50ms, Success: 100% │
│ Resources: CPU 40%, Memory 60%, Connections 500/1000 │
│ │
│ Traffic Spike (2x) │
│ ─────────────────────────────────────────────────────────── │
│ Requests: 2000/s │
│ ├── Queue builds up (waiting threads) │
│ ├── Latency increases: 50ms → 200ms │
│ ├── Memory increases (queued requests) │
│ └── Still functional but degraded │
│ │
│ Sustained Overload (queues grow) │
│ ─────────────────────────────────────────────────────────── │
│ ├── GC pressure increases (more objects in memory) │
│ ├── Context switching overhead increases │
│ ├── Connection pool exhausted → requests wait │
│ ├── Client timeouts → retries → MORE load │
│ └── Latency: 200ms → 5s → timeout │
│ │
│ Cascading Failure │
│ ─────────────────────────────────────────────────────────── │
│ ├── Health checks fail → removed from load balancer │
│ ├── Traffic redistributed to remaining servers │
│ ├── Remaining servers now face 3x normal load │
│ ├── They fail too → domino effect │
│ └── Complete system outage │
│ │
└────────────────────────────────────────────────────────────────┘
Load Shedding Fundamentals
Load shedding deliberately rejects requests to protect system capacity:
from dataclasses import dataclass
from typing import Optional, Callable
import time
import threading
from collections import deque
@dataclass
class LoadShedderConfig:
# Maximum concurrent requests
max_concurrent: int = 100
# Maximum queue depth
max_queue_depth: int = 50
# Maximum request age in queue (seconds)
max_queue_time: float = 5.0
# Target latency (p99)
target_latency_ms: float = 100.0
# CPU threshold for shedding
cpu_threshold: float = 0.85
# Memory threshold for shedding
memory_threshold: float = 0.90
class AdaptiveLoadShedder:
"""
Adaptive load shedding based on multiple signals:
- Concurrency limits
- Queue depth
- Latency targets
- Resource utilization
"""
def __init__(self, config: LoadShedderConfig):
self.config = config
self.current_concurrent = 0
self.queue = deque()
self.lock = threading.Lock()
# Metrics for adaptive behavior
self.recent_latencies = deque(maxlen=100)
self.recent_shed_rate = deque(maxlen=100)
# AIMD (Additive Increase, Multiplicative Decrease) for concurrency
self.dynamic_limit = config.max_concurrent
def should_shed(self, request: 'Request') -> tuple[bool, str]:
"""
Determine if request should be shed.
Returns (should_shed, reason).
"""
with self.lock:
# Check 1: Hard concurrency limit
if self.current_concurrent >= self.dynamic_limit:
# Can we queue it?
if len(self.queue) >= self.config.max_queue_depth:
return True, "queue_full"
# Queue it for later processing
return False, "queued"
# Check 2: Resource utilization
cpu_usage = self._get_cpu_usage()
if cpu_usage > self.config.cpu_threshold:
return True, f"cpu_high:{cpu_usage:.2f}"
memory_usage = self._get_memory_usage()
if memory_usage > self.config.memory_threshold:
return True, f"memory_high:{memory_usage:.2f}"
# Check 3: Priority-based shedding under moderate load
if self.current_concurrent > self.dynamic_limit * 0.8:
if not self._is_high_priority(request):
return True, "low_priority_shed"
return False, "admitted"
def admit(self, request: 'Request') -> bool:
"""
Attempt to admit a request for processing.
Returns True if admitted, False if shed.
"""
should_shed, reason = self.should_shed(request)
if should_shed:
self._record_shed(reason)
return False
with self.lock:
self.current_concurrent += 1
return True
def release(self, request: 'Request', latency_ms: float):
"""
Release a request slot and record latency.
"""
with self.lock:
self.current_concurrent -= 1
self.recent_latencies.append(latency_ms)
# Adjust dynamic limit based on latency
self._adjust_limit(latency_ms)
def _adjust_limit(self, latency_ms: float):
"""
AIMD-style adjustment of concurrency limit.
Additive increase when latency is good,
Multiplicative decrease when latency is bad.
"""
with self.lock:
if latency_ms < self.config.target_latency_ms:
# Additive increase: slowly grow capacity
self.dynamic_limit = min(
self.config.max_concurrent,
self.dynamic_limit + 1
)
elif latency_ms > self.config.target_latency_ms * 2:
# Multiplicative decrease: quickly reduce capacity
self.dynamic_limit = max(
10, # Minimum limit
int(self.dynamic_limit * 0.75)
)
def _is_high_priority(self, request: 'Request') -> bool:
"""
Determine request priority for selective shedding.
"""
# Health checks are always high priority
if request.path == '/health':
return True
# Authenticated users get priority
if request.user_id:
return True
# Critical business paths
if request.path.startswith('/api/payments'):
return True
return False
def _get_cpu_usage(self) -> float:
import psutil
return psutil.cpu_percent() / 100.0
def _get_memory_usage(self) -> float:
import psutil
return psutil.virtual_memory().percent / 100.0
def _record_shed(self, reason: str):
self.recent_shed_rate.append((time.time(), reason))
Queue-Based Load Shedding
Intelligent queue management prevents old requests from consuming capacity:
import heapq
from dataclasses import dataclass, field
from typing import Any
import asyncio
@dataclass(order=True)
class PrioritizedRequest:
priority: int
timestamp: float = field(compare=False)
request: Any = field(compare=False)
deadline: float = field(compare=False)
class DeadlineAwareQueue:
"""
Priority queue that automatically expires old requests.
Inspired by gRPC deadline propagation.
"""
def __init__(self, max_size: int = 1000, default_deadline: float = 5.0):
self.max_size = max_size
self.default_deadline = default_deadline
self.queue: list[PrioritizedRequest] = []
self.lock = asyncio.Lock()
async def enqueue(self, request: Any, priority: int = 5,
deadline: Optional[float] = None) -> bool:
"""
Enqueue a request with priority and deadline.
Returns False if queue is full.
"""
async with self.lock:
# Expire old entries first
await self._expire_old_entries()
if len(self.queue) >= self.max_size:
# Try to make room by dropping lowest priority
if priority > self.queue[-1].priority:
heapq.heappop(self.queue)
else:
return False # Shed this request
deadline = deadline or (time.time() + self.default_deadline)
entry = PrioritizedRequest(
priority=-priority, # Negative for max-heap behavior
timestamp=time.time(),
request=request,
deadline=deadline
)
heapq.heappush(self.queue, entry)
return True
async def dequeue(self) -> Optional[Any]:
"""
Dequeue highest priority non-expired request.
"""
async with self.lock:
now = time.time()
while self.queue:
entry = heapq.heappop(self.queue)
# Skip expired entries
if entry.deadline < now:
continue
# Calculate remaining deadline
remaining = entry.deadline - now
if remaining < 0.1: # Less than 100ms left
# Not enough time to process, skip
continue
return entry.request
return None
async def _expire_old_entries(self):
"""
Remove expired entries from queue.
"""
now = time.time()
self.queue = [e for e in self.queue if e.deadline > now]
heapq.heapify(self.queue)
# Usage in HTTP server
class LoadSheddingServer:
def __init__(self):
self.queue = DeadlineAwareQueue(max_size=500, default_deadline=10.0)
self.workers = []
async def handle_request(self, request):
# Extract deadline from headers (gRPC-style)
deadline = request.headers.get('X-Deadline')
if deadline:
deadline = float(deadline)
else:
deadline = time.time() + 30.0 # Default 30s
# Calculate priority
priority = self._calculate_priority(request)
# Attempt to enqueue
if not await self.queue.enqueue(request, priority, deadline):
return Response(status=503, body='Service Unavailable')
# Request will be processed by worker pool
return await request.future
def _calculate_priority(self, request) -> int:
"""
Higher number = higher priority.
"""
base_priority = 5
# Boost for authenticated users
if request.user_id:
base_priority += 2
# Boost for important paths
if '/checkout' in request.path:
base_priority += 3
# Reduce for known heavy operations
if '/export' in request.path or '/report' in request.path:
base_priority -= 2
return max(1, min(10, base_priority))
Backpressure: Propagating Load Signals
Backpressure communicates capacity constraints upstream:
┌────────────────────────────────────────────────────────────────┐
│ Backpressure Propagation │
├────────────────────────────────────────────────────────────────┤
│ │
│ Without Backpressure: │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Client │───▶│ Gateway │───▶│ Service │───▶│ DB │ │
│ │ 1000/s │ │ Fast │ │ Fast │ │ Slow │ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
│ │ │ │ │ │
│ │ │ ┌────┴────┐ │ │
│ │ │ │ Queue │ │ │
│ │ │ │ grows │◀────────┘ │
│ │ │ │ forever │ (DB can't │
│ │ │ └────┬────┘ keep up) │
│ │ │ │ │
│ │ │ ┌────▼────┐ │
│ │ │ │ OOM │ │
│ │ │ │ Crash │ │
│ │ │ └─────────┘ │
│ │
│ With Backpressure: │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Client │◀───│ Gateway │◀───│ Service │◀───│ DB │ │
│ │ Slows │ │ Slows │ │ Bounded │ │ Slow │ │
│ │ Down │ │ Down │ │ Queue │ │ │ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
│ │ │ │ │ │
│ │◀─────────────┼──────────────┼──────────────┘ │
│ │ Backpressure signal propagates upstream │
│ │ │
└────────────────────────────────────────────────────────────────┘
Implementing Backpressure with Bounded Channels
package main
import (
"context"
"errors"
"sync"
"time"
)
// BoundedChannel implements backpressure via bounded queue
type BoundedChannel[T any] struct {
ch chan T
capacity int
mu sync.RWMutex
closed bool
}
func NewBoundedChannel[T any](capacity int) *BoundedChannel[T] {
return &BoundedChannel[T]{
ch: make(chan T, capacity),
capacity: capacity,
}
}
// Send with backpressure - blocks when full
func (bc *BoundedChannel[T]) Send(ctx context.Context, item T) error {
bc.mu.RLock()
if bc.closed {
bc.mu.RUnlock()
return errors.New("channel closed")
}
bc.mu.RUnlock()
select {
case bc.ch <- item:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// TrySend - non-blocking send, returns false if would block
func (bc *BoundedChannel[T]) TrySend(item T) bool {
bc.mu.RLock()
if bc.closed {
bc.mu.RUnlock()
return false
}
bc.mu.RUnlock()
select {
case bc.ch <- item:
return true
default:
return false
}
}
// SendWithTimeout - send with deadline
func (bc *BoundedChannel[T]) SendWithTimeout(item T, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return bc.Send(ctx, item)
}
// Receive with context
func (bc *BoundedChannel[T]) Receive(ctx context.Context) (T, error) {
var zero T
select {
case item, ok := <-bc.ch:
if !ok {
return zero, errors.New("channel closed")
}
return item, nil
case <-ctx.Done():
return zero, ctx.Err()
}
}
// Len returns current queue depth
func (bc *BoundedChannel[T]) Len() int {
return len(bc.ch)
}
// IsFull returns true if at capacity
func (bc *BoundedChannel[T]) IsFull() bool {
return len(bc.ch) >= bc.capacity
}
// Pressure returns queue utilization (0.0 to 1.0)
func (bc *BoundedChannel[T]) Pressure() float64 {
return float64(len(bc.ch)) / float64(bc.capacity)
}
// Pipeline with backpressure
type Pipeline struct {
stages []*BoundedChannel[*Request]
workers int
wg sync.WaitGroup
}
func NewPipeline(stageCount, capacity, workers int) *Pipeline {
stages := make([]*BoundedChannel[*Request], stageCount)
for i := range stages {
stages[i] = NewBoundedChannel[*Request](capacity)
}
return &Pipeline{
stages: stages,
workers: workers,
}
}
func (p *Pipeline) Start(ctx context.Context) {
// Start workers for each stage
for stageIdx := range p.stages {
for w := 0; w < p.workers; w++ {
p.wg.Add(1)
go p.runWorker(ctx, stageIdx)
}
}
}
func (p *Pipeline) runWorker(ctx context.Context, stageIdx int) {
defer p.wg.Done()
input := p.stages[stageIdx]
for {
req, err := input.Receive(ctx)
if err != nil {
return
}
// Process this stage
result := p.processStage(stageIdx, req)
// If not last stage, send to next (with backpressure)
if stageIdx < len(p.stages)-1 {
nextStage := p.stages[stageIdx+1]
// This will block if next stage is at capacity
// Backpressure propagates upstream
if err := nextStage.Send(ctx, result); err != nil {
// Context cancelled or channel closed
return
}
} else {
// Final stage - send response
req.respond(result)
}
}
}
func (p *Pipeline) Submit(ctx context.Context, req *Request) error {
// Submit to first stage - blocks if pipeline is backed up
return p.stages[0].Send(ctx, req)
}
func (p *Pipeline) Pressure() []float64 {
pressures := make([]float64, len(p.stages))
for i, stage := range p.stages {
pressures[i] = stage.Pressure()
}
return pressures
}
Token Bucket for Rate-Based Backpressure
package main
import (
"sync"
"time"
)
// TokenBucket implements rate limiting with burst capacity
type TokenBucket struct {
capacity float64
tokens float64
refillRate float64 // tokens per second
lastRefill time.Time
mu sync.Mutex
}
func NewTokenBucket(capacity float64, refillRate float64) *TokenBucket {
return &TokenBucket{
capacity: capacity,
tokens: capacity,
refillRate: refillRate,
lastRefill: time.Now(),
}
}
// TryAcquire attempts to acquire n tokens without blocking
func (tb *TokenBucket) TryAcquire(n float64) bool {
tb.mu.Lock()
defer tb.mu.Unlock()
tb.refill()
if tb.tokens >= n {
tb.tokens -= n
return true
}
return false
}
// Acquire blocks until tokens are available
func (tb *TokenBucket) Acquire(n float64) time.Duration {
tb.mu.Lock()
defer tb.mu.Unlock()
tb.refill()
if tb.tokens >= n {
tb.tokens -= n
return 0
}
// Calculate wait time
deficit := n - tb.tokens
waitTime := time.Duration(deficit / tb.refillRate * float64(time.Second))
// Wait and then take tokens
time.Sleep(waitTime)
tb.tokens = 0 // We're taking all we waited for
tb.lastRefill = time.Now()
return waitTime
}
func (tb *TokenBucket) refill() {
now := time.Now()
elapsed := now.Sub(tb.lastRefill).Seconds()
// Add tokens based on elapsed time
tb.tokens = min(tb.capacity, tb.tokens + elapsed * tb.refillRate)
tb.lastRefill = now
}
// Available returns current token count
func (tb *TokenBucket) Available() float64 {
tb.mu.Lock()
defer tb.mu.Unlock()
tb.refill()
return tb.tokens
}
// AdaptiveRateLimiter adjusts rate based on downstream signals
type AdaptiveRateLimiter struct {
bucket *TokenBucket
baseRate float64
currentRate float64
minRate float64
maxRate float64
mu sync.RWMutex
// Vegas-style congestion control
baseRTT time.Duration
recentRTTs []time.Duration
}
func NewAdaptiveRateLimiter(baseRate, minRate, maxRate float64) *AdaptiveRateLimiter {
return &AdaptiveRateLimiter{
bucket: NewTokenBucket(baseRate*2, baseRate),
baseRate: baseRate,
currentRate: baseRate,
minRate: minRate,
maxRate: maxRate,
recentRTTs: make([]time.Duration, 0, 100),
}
}
// RecordRTT records a round-trip time and adjusts rate
func (arl *AdaptiveRateLimiter) RecordRTT(rtt time.Duration) {
arl.mu.Lock()
defer arl.mu.Unlock()
arl.recentRTTs = append(arl.recentRTTs, rtt)
if len(arl.recentRTTs) > 100 {
arl.recentRTTs = arl.recentRTTs[1:]
}
// Update base RTT (minimum observed)
if arl.baseRTT == 0 || rtt < arl.baseRTT {
arl.baseRTT = rtt
}
// Vegas-style adjustment
arl.adjustRate()
}
func (arl *AdaptiveRateLimiter) adjustRate() {
if len(arl.recentRTTs) < 10 || arl.baseRTT == 0 {
return
}
// Calculate average recent RTT
var sum time.Duration
for _, rtt := range arl.recentRTTs[len(arl.recentRTTs)-10:] {
sum += rtt
}
avgRTT := sum / 10
// Vegas diff: expected throughput vs actual
// diff = (currentRate * baseRTT / avgRTT) - currentRate
ratio := float64(arl.baseRTT) / float64(avgRTT)
if ratio > 0.9 {
// RTT close to base - increase rate
arl.currentRate = min(arl.maxRate, arl.currentRate * 1.1)
} else if ratio < 0.5 {
// RTT doubled - significant congestion
arl.currentRate = max(arl.minRate, arl.currentRate * 0.5)
} else if ratio < 0.7 {
// Moderate congestion
arl.currentRate = max(arl.minRate, arl.currentRate * 0.9)
}
// Update bucket rate
arl.bucket = NewTokenBucket(arl.currentRate*2, arl.currentRate)
}
func (arl *AdaptiveRateLimiter) TryAcquire() bool {
return arl.bucket.TryAcquire(1)
}
Circuit Breaker Integration
Circuit breakers complement load shedding by stopping calls to failing dependencies:
import asyncio
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable, Any, Optional
import time
import random
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject all calls
HALF_OPEN = "half_open" # Testing if recovered
@dataclass
class CircuitBreakerConfig:
failure_threshold: int = 5 # Failures before opening
success_threshold: int = 3 # Successes to close from half-open
reset_timeout: float = 30.0 # Seconds before half-open
half_open_max_calls: int = 3 # Max concurrent calls in half-open
class CircuitBreaker:
def __init__(self, name: str, config: CircuitBreakerConfig = None):
self.name = name
self.config = config or CircuitBreakerConfig()
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = 0
self.half_open_calls = 0
self.lock = asyncio.Lock()
async def call(self, func: Callable, *args, **kwargs) -> Any:
"""
Execute function with circuit breaker protection.
"""
async with self.lock:
if not await self._can_execute():
raise CircuitOpenError(self.name, self._time_until_reset())
try:
result = await func(*args, **kwargs)
await self._on_success()
return result
except Exception as e:
await self._on_failure()
raise
async def _can_execute(self) -> bool:
if self.state == CircuitState.CLOSED:
return True
if self.state == CircuitState.OPEN:
# Check if reset timeout has passed
if time.time() - self.last_failure_time >= self.config.reset_timeout:
self.state = CircuitState.HALF_OPEN
self.half_open_calls = 0
self.success_count = 0
return True
return False
if self.state == CircuitState.HALF_OPEN:
if self.half_open_calls >= self.config.half_open_max_calls:
return False
self.half_open_calls += 1
return True
return False
async def _on_success(self):
async with self.lock:
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.config.success_threshold:
self.state = CircuitState.CLOSED
self.failure_count = 0
elif self.state == CircuitState.CLOSED:
# Reset failure count on success
self.failure_count = max(0, self.failure_count - 1)
async def _on_failure(self):
async with self.lock:
self.failure_count += 1
self.last_failure_time = time.time()
if self.state == CircuitState.HALF_OPEN:
# Any failure in half-open reopens circuit
self.state = CircuitState.OPEN
elif self.state == CircuitState.CLOSED:
if self.failure_count >= self.config.failure_threshold:
self.state = CircuitState.OPEN
def _time_until_reset(self) -> float:
elapsed = time.time() - self.last_failure_time
return max(0, self.config.reset_timeout - elapsed)
class CircuitOpenError(Exception):
def __init__(self, circuit_name: str, retry_after: float):
self.circuit_name = circuit_name
self.retry_after = retry_after
super().__init__(f"Circuit {circuit_name} is open, retry after {retry_after:.1f}s")
# Combined load shedding + circuit breaker
class ResilientClient:
def __init__(self, base_url: str):
self.base_url = base_url
self.circuit = CircuitBreaker("api", CircuitBreakerConfig(
failure_threshold=5,
reset_timeout=30.0
))
self.rate_limiter = AdaptiveRateLimiter(100, 10, 500)
async def request(self, path: str, **kwargs) -> Response:
# Layer 1: Rate limiting with backpressure
if not self.rate_limiter.try_acquire():
# Could block here, or reject
raise RateLimitExceeded()
# Layer 2: Circuit breaker
start_time = time.time()
try:
result = await self.circuit.call(
self._do_request, path, **kwargs
)
# Record RTT for adaptive rate limiting
rtt = time.time() - start_time
self.rate_limiter.record_rtt(rtt)
return result
except CircuitOpenError:
# Fast fail - don't even try
raise
except Exception as e:
rtt = time.time() - start_time
self.rate_limiter.record_rtt(rtt * 2) # Penalize failures
raise
async def _do_request(self, path: str, **kwargs) -> Response:
async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.base_url}{path}",
timeout=aiohttp.ClientTimeout(total=10),
**kwargs
) as resp:
if resp.status >= 500:
raise ServerError(resp.status)
return await resp.json()
Cooperative Load Shedding Across Services
In microservices, load shedding must be coordinated:
import aiohttp
from typing import Dict, Optional
import asyncio
class LoadSignalPropagator:
"""
Propagates load signals via headers.
Similar to deadline propagation in gRPC.
"""
HEADER_LOAD_LEVEL = 'X-Load-Level'
HEADER_DEADLINE = 'X-Deadline'
HEADER_PRIORITY = 'X-Priority'
def __init__(self, service_name: str):
self.service_name = service_name
self.current_load = 0.0
def inject_headers(self, request_context: dict,
outgoing_headers: dict):
"""
Inject load signals into outgoing request headers.
"""
# Propagate deadline with buffer for this service
if 'deadline' in request_context:
remaining = request_context['deadline'] - time.time()
# Reserve 10% for this service
downstream_deadline = time.time() + (remaining * 0.9)
outgoing_headers[self.HEADER_DEADLINE] = str(downstream_deadline)
# Propagate priority
if 'priority' in request_context:
outgoing_headers[self.HEADER_PRIORITY] = str(request_context['priority'])
# Include current load level
outgoing_headers[self.HEADER_LOAD_LEVEL] = str(self.current_load)
def extract_signals(self, headers: dict) -> dict:
"""
Extract load signals from incoming request headers.
"""
signals = {}
if self.HEADER_DEADLINE in headers:
signals['deadline'] = float(headers[self.HEADER_DEADLINE])
if self.HEADER_PRIORITY in headers:
signals['priority'] = int(headers[self.HEADER_PRIORITY])
if self.HEADER_LOAD_LEVEL in headers:
signals['upstream_load'] = float(headers[self.HEADER_LOAD_LEVEL])
return signals
def should_shed_based_on_signals(self, signals: dict) -> bool:
"""
Decide if request should be shed based on propagated signals.
"""
# Check deadline
if 'deadline' in signals:
remaining = signals['deadline'] - time.time()
if remaining < 0.1: # Less than 100ms
return True
# Shed low priority when upstream is overloaded
if signals.get('upstream_load', 0) > 0.9:
if signals.get('priority', 5) < 3:
return True
return False
# Service with coordinated load shedding
class CoordinatedService:
def __init__(self, name: str, dependencies: list[str]):
self.name = name
self.dependencies = dependencies
self.propagator = LoadSignalPropagator(name)
self.load_shedder = AdaptiveLoadShedder(LoadShedderConfig())
async def handle_request(self, request: Request) -> Response:
# Extract signals from incoming request
signals = self.propagator.extract_signals(request.headers)
# Create context for this request
context = {
'deadline': signals.get('deadline', time.time() + 30),
'priority': signals.get('priority', 5),
'start_time': time.time()
}
# Check if should shed based on signals
if self.propagator.should_shed_based_on_signals(signals):
return Response(status=503, body='Shed based on upstream signals')
# Local load shedding decision
if not self.load_shedder.admit(request):
return Response(status=503, body='Shed due to local overload')
try:
# Process request
result = await self.process(request, context)
return Response(status=200, body=result)
finally:
latency = (time.time() - context['start_time']) * 1000
self.load_shedder.release(request, latency)
async def call_dependency(self, dep_name: str, path: str,
context: dict) -> Response:
"""
Call dependency with propagated signals.
"""
headers = {}
self.propagator.inject_headers(context, headers)
# Check if deadline already passed
if context['deadline'] < time.time():
raise DeadlineExceeded()
async with aiohttp.ClientSession() as session:
timeout = context['deadline'] - time.time()
async with session.get(
f"http://{dep_name}{path}",
headers=headers,
timeout=aiohttp.ClientTimeout(total=timeout)
) as resp:
return await resp.json()
CoDel (Controlled Delay) Algorithm
CoDel manages queue delay to prevent bufferbloat:
from dataclasses import dataclass
from typing import Generic, TypeVar, Optional
from collections import deque
import time
T = TypeVar('T')
@dataclass
class CoDelConfig:
target_delay: float = 0.005 # 5ms target delay
interval: float = 0.100 # 100ms interval
max_queue: int = 1000
@dataclass
class CoDelStats:
drops: int = 0
total: int = 0
current_delay: float = 0.0
dropping: bool = False
class CoDelQueue(Generic[T]):
"""
CoDel (Controlled Delay) queue implementation.
Based on Nichols & Jacobson's algorithm.
"""
def __init__(self, config: CoDelConfig = None):
self.config = config or CoDelConfig()
self.queue: deque[tuple[float, T]] = deque()
self.stats = CoDelStats()
# CoDel state
self.first_above_time = 0
self.drop_next = 0
self.count = 0
self.dropping = False
self.last_count = 0
def enqueue(self, item: T) -> bool:
"""
Add item to queue.
Returns False if queue is full.
"""
if len(self.queue) >= self.config.max_queue:
self.stats.drops += 1
return False
self.queue.append((time.time(), item))
self.stats.total += 1
return True
def dequeue(self) -> Optional[T]:
"""
Remove and return item, applying CoDel dropping.
"""
if not self.queue:
return None
enqueue_time, item = self.queue.popleft()
now = time.time()
sojourn_time = now - enqueue_time
self.stats.current_delay = sojourn_time
# CoDel algorithm
if sojourn_time < self.config.target_delay:
# Good - below target
self.first_above_time = 0
elif self.first_above_time == 0:
# First time above target
self.first_above_time = now + self.config.interval
elif now >= self.first_above_time:
# Been above target for interval - start dropping
self._codel_drop(now, sojourn_time)
return item
def _codel_drop(self, now: float, sojourn_time: float):
"""
CoDel dropping logic.
"""
if not self.dropping:
# Enter dropping state
self.dropping = True
self.stats.dropping = True
# Set drop rate based on previous interval
if self.count > 2 and now - self.drop_next < 16 * self.config.interval:
self.count = self.count - 2
else:
self.count = 1
self.drop_next = now + self._control_law(self.count)
elif now >= self.drop_next:
# Time to potentially drop
while now >= self.drop_next and len(self.queue) > 1:
# Drop head of queue
self.queue.popleft()
self.stats.drops += 1
self.count += 1
self.drop_next = now + self._control_law(self.count)
# Exit dropping state if delay is good
if sojourn_time < self.config.target_delay:
self.dropping = False
self.stats.dropping = False
def _control_law(self, count: int) -> float:
"""
CoDel control law: interval / sqrt(count)
"""
return self.config.interval / (count ** 0.5)
# Usage in server
class CoDelServer:
def __init__(self):
self.queue = CoDelQueue[Request](CoDelConfig(
target_delay=0.010, # 10ms target
interval=0.100, # 100ms interval
max_queue=500
))
async def handle_connection(self, conn):
request = await conn.read()
if not self.queue.enqueue(request):
# Queue full
await conn.write(Response(status=503))
return
# Request will be processed by workers
# CoDel automatically drops if queue delay is too high
async def worker(self):
while True:
request = self.queue.dequeue()
if request:
response = await self.process(request)
await request.conn.write(response)
else:
await asyncio.sleep(0.001)
Monitoring and Observability
Comprehensive metrics for load shedding:
from prometheus_client import Counter, Histogram, Gauge, Summary
# Request metrics
requests_total = Counter(
'requests_total',
'Total requests',
['status', 'path']
)
requests_shed = Counter(
'requests_shed_total',
'Requests shed (rejected)',
['reason']
)
# Queue metrics
queue_depth = Gauge(
'queue_depth',
'Current queue depth',
['queue_name']
)
queue_latency = Histogram(
'queue_latency_seconds',
'Time spent in queue',
['queue_name'],
buckets=[.001, .005, .01, .025, .05, .1, .25, .5, 1.0, 2.5, 5.0]
)
# Load shedding metrics
shed_rate = Gauge(
'shed_rate',
'Current shedding rate (0-1)',
['service']
)
concurrency_limit = Gauge(
'concurrency_limit',
'Current dynamic concurrency limit',
['service']
)
concurrency_current = Gauge(
'concurrency_current',
'Current concurrent requests',
['service']
)
# Circuit breaker metrics
circuit_state = Gauge(
'circuit_breaker_state',
'Circuit breaker state (0=closed, 1=open, 2=half_open)',
['circuit_name']
)
circuit_failures = Counter(
'circuit_breaker_failures_total',
'Circuit breaker failure count',
['circuit_name']
)
# Backpressure metrics
backpressure_level = Gauge(
'backpressure_level',
'Current backpressure level (0-1)',
['stage']
)
class MetricsMiddleware:
def __init__(self, load_shedder: AdaptiveLoadShedder,
service_name: str):
self.load_shedder = load_shedder
self.service_name = service_name
async def __call__(self, request, call_next):
# Update concurrency metrics
concurrency_current.labels(service=self.service_name).set(
self.load_shedder.current_concurrent
)
concurrency_limit.labels(service=self.service_name).set(
self.load_shedder.dynamic_limit
)
# Attempt to admit
should_shed, reason = self.load_shedder.should_shed(request)
if should_shed:
requests_shed.labels(reason=reason).inc()
shed_rate.labels(service=self.service_name).set(
self._calculate_shed_rate()
)
return Response(status=503, headers={
'Retry-After': '5',
'X-Shed-Reason': reason
})
# Process request
start_time = time.time()
try:
response = await call_next(request)
requests_total.labels(
status=response.status,
path=request.path
).inc()
return response
finally:
latency = (time.time() - start_time) * 1000
self.load_shedder.release(request, latency)
Summary
Adaptive load shedding and backpressure are essential for building resilient systems:
- Load Shedding: Deliberately reject requests when capacity is exceeded, using AIMD-style dynamic limits, priority-based selection, and deadline awareness
- Backpressure: Propagate capacity signals upstream through bounded queues, token buckets, and adaptive rate limiting
- Circuit Breakers: Stop calls to failing dependencies, providing fast failure and allowing recovery
- CoDel Algorithm: Manage queue delay to prevent bufferbloat and latency spikes
- Coordinated Shedding: Propagate deadlines and priorities across service boundaries for system-wide coordination
The key insight is that systems must choose between serving all requests poorly and serving some requests well. Load shedding makes this choice explicit and controlled, rather than letting the system degrade unpredictably. Combined with backpressure that slows incoming load before queues overflow, these techniques enable graceful degradation under stress.
For production systems:
- Implement multiple layers: rate limiting → concurrency limiting → queue management → circuit breaking
- Use adaptive limits that respond to actual latency, not just static thresholds
- Propagate deadlines and priorities across service boundaries
- Monitor shed rates, queue depths, and circuit breaker states for operational visibility
- Test under realistic load conditions to tune parameters
What did you think?