Back to Blog

Adaptive Load Shedding & Backpressure: Protecting Systems Under Stress

March 21, 20263 min read1 views

Adaptive Load Shedding & Backpressure: Protecting Systems Under Stress

Load shedding and backpressure are complementary techniques for handling system overload. Load shedding deliberately drops requests when capacity is exceeded, while backpressure propagates slowdown signals upstream to reduce incoming load. Together, they prevent cascading failures, protect downstream dependencies, and maintain service quality for admitted requests. Understanding their internals is essential for building systems that degrade gracefully under stress.

Why This Matters in Real-World Projects

Every production system eventually faces overload conditions:

  1. Traffic Spikes: Viral content, marketing campaigns, or breaking news can multiply traffic 10-100x in minutes
  2. Dependency Failures: When a database or downstream service slows down, requests pile up, consuming memory and threads
  3. Cascading Failures: One overloaded service causes timeouts in callers, which then overload, creating system-wide collapse
  4. Resource Exhaustion: Without load shedding, systems consume all memory, file descriptors, or connections before crashing
  5. Quality vs Quantity: Serving 1000 requests slowly is often worse than serving 500 requests quickly and rejecting 500

The Overload Death Spiral

Understanding how systems fail under load reveals why load shedding is essential:

┌────────────────────────────────────────────────────────────────┐
│                    The Overload Death Spiral                   │
├────────────────────────────────────────────────────────────────┤
│                                                                │
│  Normal Load                                                   │
│  ───────────────────────────────────────────────────────────  │
│  Requests: 1000/s, Latency: 50ms, Success: 100%               │
│  Resources: CPU 40%, Memory 60%, Connections 500/1000         │
│                                                                │
│  Traffic Spike (2x)                                           │
│  ───────────────────────────────────────────────────────────  │
│  Requests: 2000/s                                             │
│  ├── Queue builds up (waiting threads)                        │
│  ├── Latency increases: 50ms → 200ms                         │
│  ├── Memory increases (queued requests)                       │
│  └── Still functional but degraded                            │
│                                                                │
│  Sustained Overload (queues grow)                             │
│  ───────────────────────────────────────────────────────────  │
│  ├── GC pressure increases (more objects in memory)           │
│  ├── Context switching overhead increases                     │
│  ├── Connection pool exhausted → requests wait                │
│  ├── Client timeouts → retries → MORE load                   │
│  └── Latency: 200ms → 5s → timeout                           │
│                                                                │
│  Cascading Failure                                            │
│  ───────────────────────────────────────────────────────────  │
│  ├── Health checks fail → removed from load balancer          │
│  ├── Traffic redistributed to remaining servers               │
│  ├── Remaining servers now face 3x normal load                │
│  ├── They fail too → domino effect                           │
│  └── Complete system outage                                   │
│                                                                │
└────────────────────────────────────────────────────────────────┘

Load Shedding Fundamentals

Load shedding deliberately rejects requests to protect system capacity:

from dataclasses import dataclass
from typing import Optional, Callable
import time
import threading
from collections import deque

@dataclass
class LoadShedderConfig:
    # Maximum concurrent requests
    max_concurrent: int = 100

    # Maximum queue depth
    max_queue_depth: int = 50

    # Maximum request age in queue (seconds)
    max_queue_time: float = 5.0

    # Target latency (p99)
    target_latency_ms: float = 100.0

    # CPU threshold for shedding
    cpu_threshold: float = 0.85

    # Memory threshold for shedding
    memory_threshold: float = 0.90


class AdaptiveLoadShedder:
    """
    Adaptive load shedding based on multiple signals:
    - Concurrency limits
    - Queue depth
    - Latency targets
    - Resource utilization
    """

    def __init__(self, config: LoadShedderConfig):
        self.config = config
        self.current_concurrent = 0
        self.queue = deque()
        self.lock = threading.Lock()

        # Metrics for adaptive behavior
        self.recent_latencies = deque(maxlen=100)
        self.recent_shed_rate = deque(maxlen=100)

        # AIMD (Additive Increase, Multiplicative Decrease) for concurrency
        self.dynamic_limit = config.max_concurrent

    def should_shed(self, request: 'Request') -> tuple[bool, str]:
        """
        Determine if request should be shed.
        Returns (should_shed, reason).
        """
        with self.lock:
            # Check 1: Hard concurrency limit
            if self.current_concurrent >= self.dynamic_limit:
                # Can we queue it?
                if len(self.queue) >= self.config.max_queue_depth:
                    return True, "queue_full"
                # Queue it for later processing
                return False, "queued"

            # Check 2: Resource utilization
            cpu_usage = self._get_cpu_usage()
            if cpu_usage > self.config.cpu_threshold:
                return True, f"cpu_high:{cpu_usage:.2f}"

            memory_usage = self._get_memory_usage()
            if memory_usage > self.config.memory_threshold:
                return True, f"memory_high:{memory_usage:.2f}"

            # Check 3: Priority-based shedding under moderate load
            if self.current_concurrent > self.dynamic_limit * 0.8:
                if not self._is_high_priority(request):
                    return True, "low_priority_shed"

            return False, "admitted"

    def admit(self, request: 'Request') -> bool:
        """
        Attempt to admit a request for processing.
        Returns True if admitted, False if shed.
        """
        should_shed, reason = self.should_shed(request)

        if should_shed:
            self._record_shed(reason)
            return False

        with self.lock:
            self.current_concurrent += 1

        return True

    def release(self, request: 'Request', latency_ms: float):
        """
        Release a request slot and record latency.
        """
        with self.lock:
            self.current_concurrent -= 1
            self.recent_latencies.append(latency_ms)

        # Adjust dynamic limit based on latency
        self._adjust_limit(latency_ms)

    def _adjust_limit(self, latency_ms: float):
        """
        AIMD-style adjustment of concurrency limit.
        Additive increase when latency is good,
        Multiplicative decrease when latency is bad.
        """
        with self.lock:
            if latency_ms < self.config.target_latency_ms:
                # Additive increase: slowly grow capacity
                self.dynamic_limit = min(
                    self.config.max_concurrent,
                    self.dynamic_limit + 1
                )
            elif latency_ms > self.config.target_latency_ms * 2:
                # Multiplicative decrease: quickly reduce capacity
                self.dynamic_limit = max(
                    10,  # Minimum limit
                    int(self.dynamic_limit * 0.75)
                )

    def _is_high_priority(self, request: 'Request') -> bool:
        """
        Determine request priority for selective shedding.
        """
        # Health checks are always high priority
        if request.path == '/health':
            return True

        # Authenticated users get priority
        if request.user_id:
            return True

        # Critical business paths
        if request.path.startswith('/api/payments'):
            return True

        return False

    def _get_cpu_usage(self) -> float:
        import psutil
        return psutil.cpu_percent() / 100.0

    def _get_memory_usage(self) -> float:
        import psutil
        return psutil.virtual_memory().percent / 100.0

    def _record_shed(self, reason: str):
        self.recent_shed_rate.append((time.time(), reason))

Queue-Based Load Shedding

Intelligent queue management prevents old requests from consuming capacity:

import heapq
from dataclasses import dataclass, field
from typing import Any
import asyncio

@dataclass(order=True)
class PrioritizedRequest:
    priority: int
    timestamp: float = field(compare=False)
    request: Any = field(compare=False)
    deadline: float = field(compare=False)


class DeadlineAwareQueue:
    """
    Priority queue that automatically expires old requests.
    Inspired by gRPC deadline propagation.
    """

    def __init__(self, max_size: int = 1000, default_deadline: float = 5.0):
        self.max_size = max_size
        self.default_deadline = default_deadline
        self.queue: list[PrioritizedRequest] = []
        self.lock = asyncio.Lock()

    async def enqueue(self, request: Any, priority: int = 5,
                      deadline: Optional[float] = None) -> bool:
        """
        Enqueue a request with priority and deadline.
        Returns False if queue is full.
        """
        async with self.lock:
            # Expire old entries first
            await self._expire_old_entries()

            if len(self.queue) >= self.max_size:
                # Try to make room by dropping lowest priority
                if priority > self.queue[-1].priority:
                    heapq.heappop(self.queue)
                else:
                    return False  # Shed this request

            deadline = deadline or (time.time() + self.default_deadline)
            entry = PrioritizedRequest(
                priority=-priority,  # Negative for max-heap behavior
                timestamp=time.time(),
                request=request,
                deadline=deadline
            )
            heapq.heappush(self.queue, entry)
            return True

    async def dequeue(self) -> Optional[Any]:
        """
        Dequeue highest priority non-expired request.
        """
        async with self.lock:
            now = time.time()

            while self.queue:
                entry = heapq.heappop(self.queue)

                # Skip expired entries
                if entry.deadline < now:
                    continue

                # Calculate remaining deadline
                remaining = entry.deadline - now
                if remaining < 0.1:  # Less than 100ms left
                    # Not enough time to process, skip
                    continue

                return entry.request

            return None

    async def _expire_old_entries(self):
        """
        Remove expired entries from queue.
        """
        now = time.time()
        self.queue = [e for e in self.queue if e.deadline > now]
        heapq.heapify(self.queue)


# Usage in HTTP server
class LoadSheddingServer:
    def __init__(self):
        self.queue = DeadlineAwareQueue(max_size=500, default_deadline=10.0)
        self.workers = []

    async def handle_request(self, request):
        # Extract deadline from headers (gRPC-style)
        deadline = request.headers.get('X-Deadline')
        if deadline:
            deadline = float(deadline)
        else:
            deadline = time.time() + 30.0  # Default 30s

        # Calculate priority
        priority = self._calculate_priority(request)

        # Attempt to enqueue
        if not await self.queue.enqueue(request, priority, deadline):
            return Response(status=503, body='Service Unavailable')

        # Request will be processed by worker pool
        return await request.future

    def _calculate_priority(self, request) -> int:
        """
        Higher number = higher priority.
        """
        base_priority = 5

        # Boost for authenticated users
        if request.user_id:
            base_priority += 2

        # Boost for important paths
        if '/checkout' in request.path:
            base_priority += 3

        # Reduce for known heavy operations
        if '/export' in request.path or '/report' in request.path:
            base_priority -= 2

        return max(1, min(10, base_priority))

Backpressure: Propagating Load Signals

Backpressure communicates capacity constraints upstream:

┌────────────────────────────────────────────────────────────────┐
│                   Backpressure Propagation                     │
├────────────────────────────────────────────────────────────────┤
│                                                                │
│  Without Backpressure:                                        │
│  ┌─────────┐    ┌─────────┐    ┌─────────┐    ┌─────────┐    │
│  │ Client  │───▶│ Gateway │───▶│ Service │───▶│   DB    │    │
│  │ 1000/s  │    │  Fast   │    │  Fast   │    │  Slow   │    │
│  └─────────┘    └─────────┘    └─────────┘    └─────────┘    │
│       │              │              │              │          │
│       │              │         ┌────┴────┐         │          │
│       │              │         │ Queue   │         │          │
│       │              │         │ grows   │◀────────┘          │
│       │              │         │ forever │   (DB can't        │
│       │              │         └────┬────┘    keep up)        │
│       │              │              │                         │
│       │              │         ┌────▼────┐                    │
│       │              │         │  OOM    │                    │
│       │              │         │ Crash   │                    │
│       │              │         └─────────┘                    │
│                                                                │
│  With Backpressure:                                           │
│  ┌─────────┐    ┌─────────┐    ┌─────────┐    ┌─────────┐    │
│  │ Client  │◀───│ Gateway │◀───│ Service │◀───│   DB    │    │
│  │ Slows   │    │ Slows   │    │ Bounded │    │  Slow   │    │
│  │  Down   │    │  Down   │    │ Queue   │    │         │    │
│  └─────────┘    └─────────┘    └─────────┘    └─────────┘    │
│       │              │              │              │          │
│       │◀─────────────┼──────────────┼──────────────┘          │
│       │     Backpressure signal propagates upstream           │
│       │                                                       │
└────────────────────────────────────────────────────────────────┘

Implementing Backpressure with Bounded Channels

package main

import (
    "context"
    "errors"
    "sync"
    "time"
)

// BoundedChannel implements backpressure via bounded queue
type BoundedChannel[T any] struct {
    ch       chan T
    capacity int
    mu       sync.RWMutex
    closed   bool
}

func NewBoundedChannel[T any](capacity int) *BoundedChannel[T] {
    return &BoundedChannel[T]{
        ch:       make(chan T, capacity),
        capacity: capacity,
    }
}

// Send with backpressure - blocks when full
func (bc *BoundedChannel[T]) Send(ctx context.Context, item T) error {
    bc.mu.RLock()
    if bc.closed {
        bc.mu.RUnlock()
        return errors.New("channel closed")
    }
    bc.mu.RUnlock()

    select {
    case bc.ch <- item:
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

// TrySend - non-blocking send, returns false if would block
func (bc *BoundedChannel[T]) TrySend(item T) bool {
    bc.mu.RLock()
    if bc.closed {
        bc.mu.RUnlock()
        return false
    }
    bc.mu.RUnlock()

    select {
    case bc.ch <- item:
        return true
    default:
        return false
    }
}

// SendWithTimeout - send with deadline
func (bc *BoundedChannel[T]) SendWithTimeout(item T, timeout time.Duration) error {
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()
    return bc.Send(ctx, item)
}

// Receive with context
func (bc *BoundedChannel[T]) Receive(ctx context.Context) (T, error) {
    var zero T
    select {
    case item, ok := <-bc.ch:
        if !ok {
            return zero, errors.New("channel closed")
        }
        return item, nil
    case <-ctx.Done():
        return zero, ctx.Err()
    }
}

// Len returns current queue depth
func (bc *BoundedChannel[T]) Len() int {
    return len(bc.ch)
}

// IsFull returns true if at capacity
func (bc *BoundedChannel[T]) IsFull() bool {
    return len(bc.ch) >= bc.capacity
}

// Pressure returns queue utilization (0.0 to 1.0)
func (bc *BoundedChannel[T]) Pressure() float64 {
    return float64(len(bc.ch)) / float64(bc.capacity)
}


// Pipeline with backpressure
type Pipeline struct {
    stages    []*BoundedChannel[*Request]
    workers   int
    wg        sync.WaitGroup
}

func NewPipeline(stageCount, capacity, workers int) *Pipeline {
    stages := make([]*BoundedChannel[*Request], stageCount)
    for i := range stages {
        stages[i] = NewBoundedChannel[*Request](capacity)
    }
    return &Pipeline{
        stages:  stages,
        workers: workers,
    }
}

func (p *Pipeline) Start(ctx context.Context) {
    // Start workers for each stage
    for stageIdx := range p.stages {
        for w := 0; w < p.workers; w++ {
            p.wg.Add(1)
            go p.runWorker(ctx, stageIdx)
        }
    }
}

func (p *Pipeline) runWorker(ctx context.Context, stageIdx int) {
    defer p.wg.Done()

    input := p.stages[stageIdx]

    for {
        req, err := input.Receive(ctx)
        if err != nil {
            return
        }

        // Process this stage
        result := p.processStage(stageIdx, req)

        // If not last stage, send to next (with backpressure)
        if stageIdx < len(p.stages)-1 {
            nextStage := p.stages[stageIdx+1]

            // This will block if next stage is at capacity
            // Backpressure propagates upstream
            if err := nextStage.Send(ctx, result); err != nil {
                // Context cancelled or channel closed
                return
            }
        } else {
            // Final stage - send response
            req.respond(result)
        }
    }
}

func (p *Pipeline) Submit(ctx context.Context, req *Request) error {
    // Submit to first stage - blocks if pipeline is backed up
    return p.stages[0].Send(ctx, req)
}

func (p *Pipeline) Pressure() []float64 {
    pressures := make([]float64, len(p.stages))
    for i, stage := range p.stages {
        pressures[i] = stage.Pressure()
    }
    return pressures
}

Token Bucket for Rate-Based Backpressure

package main

import (
    "sync"
    "time"
)

// TokenBucket implements rate limiting with burst capacity
type TokenBucket struct {
    capacity   float64
    tokens     float64
    refillRate float64  // tokens per second
    lastRefill time.Time
    mu         sync.Mutex
}

func NewTokenBucket(capacity float64, refillRate float64) *TokenBucket {
    return &TokenBucket{
        capacity:   capacity,
        tokens:     capacity,
        refillRate: refillRate,
        lastRefill: time.Now(),
    }
}

// TryAcquire attempts to acquire n tokens without blocking
func (tb *TokenBucket) TryAcquire(n float64) bool {
    tb.mu.Lock()
    defer tb.mu.Unlock()

    tb.refill()

    if tb.tokens >= n {
        tb.tokens -= n
        return true
    }
    return false
}

// Acquire blocks until tokens are available
func (tb *TokenBucket) Acquire(n float64) time.Duration {
    tb.mu.Lock()
    defer tb.mu.Unlock()

    tb.refill()

    if tb.tokens >= n {
        tb.tokens -= n
        return 0
    }

    // Calculate wait time
    deficit := n - tb.tokens
    waitTime := time.Duration(deficit / tb.refillRate * float64(time.Second))

    // Wait and then take tokens
    time.Sleep(waitTime)

    tb.tokens = 0  // We're taking all we waited for
    tb.lastRefill = time.Now()

    return waitTime
}

func (tb *TokenBucket) refill() {
    now := time.Now()
    elapsed := now.Sub(tb.lastRefill).Seconds()

    // Add tokens based on elapsed time
    tb.tokens = min(tb.capacity, tb.tokens + elapsed * tb.refillRate)
    tb.lastRefill = now
}

// Available returns current token count
func (tb *TokenBucket) Available() float64 {
    tb.mu.Lock()
    defer tb.mu.Unlock()
    tb.refill()
    return tb.tokens
}


// AdaptiveRateLimiter adjusts rate based on downstream signals
type AdaptiveRateLimiter struct {
    bucket      *TokenBucket
    baseRate    float64
    currentRate float64
    minRate     float64
    maxRate     float64
    mu          sync.RWMutex

    // Vegas-style congestion control
    baseRTT     time.Duration
    recentRTTs  []time.Duration
}

func NewAdaptiveRateLimiter(baseRate, minRate, maxRate float64) *AdaptiveRateLimiter {
    return &AdaptiveRateLimiter{
        bucket:      NewTokenBucket(baseRate*2, baseRate),
        baseRate:    baseRate,
        currentRate: baseRate,
        minRate:     minRate,
        maxRate:     maxRate,
        recentRTTs:  make([]time.Duration, 0, 100),
    }
}

// RecordRTT records a round-trip time and adjusts rate
func (arl *AdaptiveRateLimiter) RecordRTT(rtt time.Duration) {
    arl.mu.Lock()
    defer arl.mu.Unlock()

    arl.recentRTTs = append(arl.recentRTTs, rtt)
    if len(arl.recentRTTs) > 100 {
        arl.recentRTTs = arl.recentRTTs[1:]
    }

    // Update base RTT (minimum observed)
    if arl.baseRTT == 0 || rtt < arl.baseRTT {
        arl.baseRTT = rtt
    }

    // Vegas-style adjustment
    arl.adjustRate()
}

func (arl *AdaptiveRateLimiter) adjustRate() {
    if len(arl.recentRTTs) < 10 || arl.baseRTT == 0 {
        return
    }

    // Calculate average recent RTT
    var sum time.Duration
    for _, rtt := range arl.recentRTTs[len(arl.recentRTTs)-10:] {
        sum += rtt
    }
    avgRTT := sum / 10

    // Vegas diff: expected throughput vs actual
    // diff = (currentRate * baseRTT / avgRTT) - currentRate
    ratio := float64(arl.baseRTT) / float64(avgRTT)

    if ratio > 0.9 {
        // RTT close to base - increase rate
        arl.currentRate = min(arl.maxRate, arl.currentRate * 1.1)
    } else if ratio < 0.5 {
        // RTT doubled - significant congestion
        arl.currentRate = max(arl.minRate, arl.currentRate * 0.5)
    } else if ratio < 0.7 {
        // Moderate congestion
        arl.currentRate = max(arl.minRate, arl.currentRate * 0.9)
    }

    // Update bucket rate
    arl.bucket = NewTokenBucket(arl.currentRate*2, arl.currentRate)
}

func (arl *AdaptiveRateLimiter) TryAcquire() bool {
    return arl.bucket.TryAcquire(1)
}

Circuit Breaker Integration

Circuit breakers complement load shedding by stopping calls to failing dependencies:

import asyncio
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable, Any, Optional
import time
import random

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, reject all calls
    HALF_OPEN = "half_open"  # Testing if recovered


@dataclass
class CircuitBreakerConfig:
    failure_threshold: int = 5      # Failures before opening
    success_threshold: int = 3      # Successes to close from half-open
    reset_timeout: float = 30.0     # Seconds before half-open
    half_open_max_calls: int = 3    # Max concurrent calls in half-open


class CircuitBreaker:
    def __init__(self, name: str, config: CircuitBreakerConfig = None):
        self.name = name
        self.config = config or CircuitBreakerConfig()
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time = 0
        self.half_open_calls = 0
        self.lock = asyncio.Lock()

    async def call(self, func: Callable, *args, **kwargs) -> Any:
        """
        Execute function with circuit breaker protection.
        """
        async with self.lock:
            if not await self._can_execute():
                raise CircuitOpenError(self.name, self._time_until_reset())

        try:
            result = await func(*args, **kwargs)
            await self._on_success()
            return result
        except Exception as e:
            await self._on_failure()
            raise

    async def _can_execute(self) -> bool:
        if self.state == CircuitState.CLOSED:
            return True

        if self.state == CircuitState.OPEN:
            # Check if reset timeout has passed
            if time.time() - self.last_failure_time >= self.config.reset_timeout:
                self.state = CircuitState.HALF_OPEN
                self.half_open_calls = 0
                self.success_count = 0
                return True
            return False

        if self.state == CircuitState.HALF_OPEN:
            if self.half_open_calls >= self.config.half_open_max_calls:
                return False
            self.half_open_calls += 1
            return True

        return False

    async def _on_success(self):
        async with self.lock:
            if self.state == CircuitState.HALF_OPEN:
                self.success_count += 1
                if self.success_count >= self.config.success_threshold:
                    self.state = CircuitState.CLOSED
                    self.failure_count = 0
            elif self.state == CircuitState.CLOSED:
                # Reset failure count on success
                self.failure_count = max(0, self.failure_count - 1)

    async def _on_failure(self):
        async with self.lock:
            self.failure_count += 1
            self.last_failure_time = time.time()

            if self.state == CircuitState.HALF_OPEN:
                # Any failure in half-open reopens circuit
                self.state = CircuitState.OPEN
            elif self.state == CircuitState.CLOSED:
                if self.failure_count >= self.config.failure_threshold:
                    self.state = CircuitState.OPEN

    def _time_until_reset(self) -> float:
        elapsed = time.time() - self.last_failure_time
        return max(0, self.config.reset_timeout - elapsed)


class CircuitOpenError(Exception):
    def __init__(self, circuit_name: str, retry_after: float):
        self.circuit_name = circuit_name
        self.retry_after = retry_after
        super().__init__(f"Circuit {circuit_name} is open, retry after {retry_after:.1f}s")


# Combined load shedding + circuit breaker
class ResilientClient:
    def __init__(self, base_url: str):
        self.base_url = base_url
        self.circuit = CircuitBreaker("api", CircuitBreakerConfig(
            failure_threshold=5,
            reset_timeout=30.0
        ))
        self.rate_limiter = AdaptiveRateLimiter(100, 10, 500)

    async def request(self, path: str, **kwargs) -> Response:
        # Layer 1: Rate limiting with backpressure
        if not self.rate_limiter.try_acquire():
            # Could block here, or reject
            raise RateLimitExceeded()

        # Layer 2: Circuit breaker
        start_time = time.time()
        try:
            result = await self.circuit.call(
                self._do_request, path, **kwargs
            )
            # Record RTT for adaptive rate limiting
            rtt = time.time() - start_time
            self.rate_limiter.record_rtt(rtt)
            return result
        except CircuitOpenError:
            # Fast fail - don't even try
            raise
        except Exception as e:
            rtt = time.time() - start_time
            self.rate_limiter.record_rtt(rtt * 2)  # Penalize failures
            raise

    async def _do_request(self, path: str, **kwargs) -> Response:
        async with aiohttp.ClientSession() as session:
            async with session.get(
                f"{self.base_url}{path}",
                timeout=aiohttp.ClientTimeout(total=10),
                **kwargs
            ) as resp:
                if resp.status >= 500:
                    raise ServerError(resp.status)
                return await resp.json()

Cooperative Load Shedding Across Services

In microservices, load shedding must be coordinated:

import aiohttp
from typing import Dict, Optional
import asyncio

class LoadSignalPropagator:
    """
    Propagates load signals via headers.
    Similar to deadline propagation in gRPC.
    """

    HEADER_LOAD_LEVEL = 'X-Load-Level'
    HEADER_DEADLINE = 'X-Deadline'
    HEADER_PRIORITY = 'X-Priority'

    def __init__(self, service_name: str):
        self.service_name = service_name
        self.current_load = 0.0

    def inject_headers(self, request_context: dict,
                       outgoing_headers: dict):
        """
        Inject load signals into outgoing request headers.
        """
        # Propagate deadline with buffer for this service
        if 'deadline' in request_context:
            remaining = request_context['deadline'] - time.time()
            # Reserve 10% for this service
            downstream_deadline = time.time() + (remaining * 0.9)
            outgoing_headers[self.HEADER_DEADLINE] = str(downstream_deadline)

        # Propagate priority
        if 'priority' in request_context:
            outgoing_headers[self.HEADER_PRIORITY] = str(request_context['priority'])

        # Include current load level
        outgoing_headers[self.HEADER_LOAD_LEVEL] = str(self.current_load)

    def extract_signals(self, headers: dict) -> dict:
        """
        Extract load signals from incoming request headers.
        """
        signals = {}

        if self.HEADER_DEADLINE in headers:
            signals['deadline'] = float(headers[self.HEADER_DEADLINE])

        if self.HEADER_PRIORITY in headers:
            signals['priority'] = int(headers[self.HEADER_PRIORITY])

        if self.HEADER_LOAD_LEVEL in headers:
            signals['upstream_load'] = float(headers[self.HEADER_LOAD_LEVEL])

        return signals

    def should_shed_based_on_signals(self, signals: dict) -> bool:
        """
        Decide if request should be shed based on propagated signals.
        """
        # Check deadline
        if 'deadline' in signals:
            remaining = signals['deadline'] - time.time()
            if remaining < 0.1:  # Less than 100ms
                return True

        # Shed low priority when upstream is overloaded
        if signals.get('upstream_load', 0) > 0.9:
            if signals.get('priority', 5) < 3:
                return True

        return False


# Service with coordinated load shedding
class CoordinatedService:
    def __init__(self, name: str, dependencies: list[str]):
        self.name = name
        self.dependencies = dependencies
        self.propagator = LoadSignalPropagator(name)
        self.load_shedder = AdaptiveLoadShedder(LoadShedderConfig())

    async def handle_request(self, request: Request) -> Response:
        # Extract signals from incoming request
        signals = self.propagator.extract_signals(request.headers)

        # Create context for this request
        context = {
            'deadline': signals.get('deadline', time.time() + 30),
            'priority': signals.get('priority', 5),
            'start_time': time.time()
        }

        # Check if should shed based on signals
        if self.propagator.should_shed_based_on_signals(signals):
            return Response(status=503, body='Shed based on upstream signals')

        # Local load shedding decision
        if not self.load_shedder.admit(request):
            return Response(status=503, body='Shed due to local overload')

        try:
            # Process request
            result = await self.process(request, context)
            return Response(status=200, body=result)
        finally:
            latency = (time.time() - context['start_time']) * 1000
            self.load_shedder.release(request, latency)

    async def call_dependency(self, dep_name: str, path: str,
                              context: dict) -> Response:
        """
        Call dependency with propagated signals.
        """
        headers = {}
        self.propagator.inject_headers(context, headers)

        # Check if deadline already passed
        if context['deadline'] < time.time():
            raise DeadlineExceeded()

        async with aiohttp.ClientSession() as session:
            timeout = context['deadline'] - time.time()
            async with session.get(
                f"http://{dep_name}{path}",
                headers=headers,
                timeout=aiohttp.ClientTimeout(total=timeout)
            ) as resp:
                return await resp.json()

CoDel (Controlled Delay) Algorithm

CoDel manages queue delay to prevent bufferbloat:

from dataclasses import dataclass
from typing import Generic, TypeVar, Optional
from collections import deque
import time

T = TypeVar('T')

@dataclass
class CoDelConfig:
    target_delay: float = 0.005     # 5ms target delay
    interval: float = 0.100         # 100ms interval
    max_queue: int = 1000


@dataclass
class CoDelStats:
    drops: int = 0
    total: int = 0
    current_delay: float = 0.0
    dropping: bool = False


class CoDelQueue(Generic[T]):
    """
    CoDel (Controlled Delay) queue implementation.
    Based on Nichols & Jacobson's algorithm.
    """

    def __init__(self, config: CoDelConfig = None):
        self.config = config or CoDelConfig()
        self.queue: deque[tuple[float, T]] = deque()
        self.stats = CoDelStats()

        # CoDel state
        self.first_above_time = 0
        self.drop_next = 0
        self.count = 0
        self.dropping = False
        self.last_count = 0

    def enqueue(self, item: T) -> bool:
        """
        Add item to queue.
        Returns False if queue is full.
        """
        if len(self.queue) >= self.config.max_queue:
            self.stats.drops += 1
            return False

        self.queue.append((time.time(), item))
        self.stats.total += 1
        return True

    def dequeue(self) -> Optional[T]:
        """
        Remove and return item, applying CoDel dropping.
        """
        if not self.queue:
            return None

        enqueue_time, item = self.queue.popleft()
        now = time.time()
        sojourn_time = now - enqueue_time
        self.stats.current_delay = sojourn_time

        # CoDel algorithm
        if sojourn_time < self.config.target_delay:
            # Good - below target
            self.first_above_time = 0
        elif self.first_above_time == 0:
            # First time above target
            self.first_above_time = now + self.config.interval
        elif now >= self.first_above_time:
            # Been above target for interval - start dropping
            self._codel_drop(now, sojourn_time)

        return item

    def _codel_drop(self, now: float, sojourn_time: float):
        """
        CoDel dropping logic.
        """
        if not self.dropping:
            # Enter dropping state
            self.dropping = True
            self.stats.dropping = True

            # Set drop rate based on previous interval
            if self.count > 2 and now - self.drop_next < 16 * self.config.interval:
                self.count = self.count - 2
            else:
                self.count = 1

            self.drop_next = now + self._control_law(self.count)

        elif now >= self.drop_next:
            # Time to potentially drop
            while now >= self.drop_next and len(self.queue) > 1:
                # Drop head of queue
                self.queue.popleft()
                self.stats.drops += 1
                self.count += 1
                self.drop_next = now + self._control_law(self.count)

        # Exit dropping state if delay is good
        if sojourn_time < self.config.target_delay:
            self.dropping = False
            self.stats.dropping = False

    def _control_law(self, count: int) -> float:
        """
        CoDel control law: interval / sqrt(count)
        """
        return self.config.interval / (count ** 0.5)


# Usage in server
class CoDelServer:
    def __init__(self):
        self.queue = CoDelQueue[Request](CoDelConfig(
            target_delay=0.010,  # 10ms target
            interval=0.100,      # 100ms interval
            max_queue=500
        ))

    async def handle_connection(self, conn):
        request = await conn.read()

        if not self.queue.enqueue(request):
            # Queue full
            await conn.write(Response(status=503))
            return

        # Request will be processed by workers
        # CoDel automatically drops if queue delay is too high

    async def worker(self):
        while True:
            request = self.queue.dequeue()
            if request:
                response = await self.process(request)
                await request.conn.write(response)
            else:
                await asyncio.sleep(0.001)

Monitoring and Observability

Comprehensive metrics for load shedding:

from prometheus_client import Counter, Histogram, Gauge, Summary

# Request metrics
requests_total = Counter(
    'requests_total',
    'Total requests',
    ['status', 'path']
)

requests_shed = Counter(
    'requests_shed_total',
    'Requests shed (rejected)',
    ['reason']
)

# Queue metrics
queue_depth = Gauge(
    'queue_depth',
    'Current queue depth',
    ['queue_name']
)

queue_latency = Histogram(
    'queue_latency_seconds',
    'Time spent in queue',
    ['queue_name'],
    buckets=[.001, .005, .01, .025, .05, .1, .25, .5, 1.0, 2.5, 5.0]
)

# Load shedding metrics
shed_rate = Gauge(
    'shed_rate',
    'Current shedding rate (0-1)',
    ['service']
)

concurrency_limit = Gauge(
    'concurrency_limit',
    'Current dynamic concurrency limit',
    ['service']
)

concurrency_current = Gauge(
    'concurrency_current',
    'Current concurrent requests',
    ['service']
)

# Circuit breaker metrics
circuit_state = Gauge(
    'circuit_breaker_state',
    'Circuit breaker state (0=closed, 1=open, 2=half_open)',
    ['circuit_name']
)

circuit_failures = Counter(
    'circuit_breaker_failures_total',
    'Circuit breaker failure count',
    ['circuit_name']
)

# Backpressure metrics
backpressure_level = Gauge(
    'backpressure_level',
    'Current backpressure level (0-1)',
    ['stage']
)

class MetricsMiddleware:
    def __init__(self, load_shedder: AdaptiveLoadShedder,
                 service_name: str):
        self.load_shedder = load_shedder
        self.service_name = service_name

    async def __call__(self, request, call_next):
        # Update concurrency metrics
        concurrency_current.labels(service=self.service_name).set(
            self.load_shedder.current_concurrent
        )
        concurrency_limit.labels(service=self.service_name).set(
            self.load_shedder.dynamic_limit
        )

        # Attempt to admit
        should_shed, reason = self.load_shedder.should_shed(request)

        if should_shed:
            requests_shed.labels(reason=reason).inc()
            shed_rate.labels(service=self.service_name).set(
                self._calculate_shed_rate()
            )
            return Response(status=503, headers={
                'Retry-After': '5',
                'X-Shed-Reason': reason
            })

        # Process request
        start_time = time.time()
        try:
            response = await call_next(request)
            requests_total.labels(
                status=response.status,
                path=request.path
            ).inc()
            return response
        finally:
            latency = (time.time() - start_time) * 1000
            self.load_shedder.release(request, latency)

Summary

Adaptive load shedding and backpressure are essential for building resilient systems:

  1. Load Shedding: Deliberately reject requests when capacity is exceeded, using AIMD-style dynamic limits, priority-based selection, and deadline awareness
  2. Backpressure: Propagate capacity signals upstream through bounded queues, token buckets, and adaptive rate limiting
  3. Circuit Breakers: Stop calls to failing dependencies, providing fast failure and allowing recovery
  4. CoDel Algorithm: Manage queue delay to prevent bufferbloat and latency spikes
  5. Coordinated Shedding: Propagate deadlines and priorities across service boundaries for system-wide coordination

The key insight is that systems must choose between serving all requests poorly and serving some requests well. Load shedding makes this choice explicit and controlled, rather than letting the system degrade unpredictably. Combined with backpressure that slows incoming load before queues overflow, these techniques enable graceful degradation under stress.

For production systems:

  • Implement multiple layers: rate limiting → concurrency limiting → queue management → circuit breaking
  • Use adaptive limits that respond to actual latency, not just static thresholds
  • Propagate deadlines and priorities across service boundaries
  • Monitor shed rates, queue depths, and circuit breaker states for operational visibility
  • Test under realistic load conditions to tune parameters

What did you think?

© 2026 Vidhya Sagar Thakur. All rights reserved.