Workflow Orchestration Engine Architecture: Building Temporal-Style Durable Execution
Workflow Orchestration Engine Architecture: Building Temporal-Style Durable Execution
Workflow orchestration engines like Temporal, Cadence, AWS Step Functions, and Prefect solve a deceptively simple problem: how do you reliably execute a multi-step process that spans hours, days, or weeks, survives infrastructure failures, and maintains exactly-once semantics? The answer involves event sourcing, deterministic replay, and distributed state machines—architectural patterns that challenge conventional application design.
The Problem: Long-Running Processes Break Everything
Consider a typical order fulfillment workflow:
1. Validate order
2. Charge payment
3. Reserve inventory
4. Notify warehouse
5. Wait for shipping confirmation (hours/days)
6. Send customer notification
7. Update analytics
Traditional approaches fail:
# Naive implementation - what could go wrong?
def process_order(order):
validate(order)
payment_id = charge_payment(order.customer_id, order.total) # What if crash here?
inventory_id = reserve_inventory(order.items) # Already charged but no inventory
warehouse_request = notify_warehouse(order)
# Wait for shipping... but process might restart
shipping_status = wait_for_shipping(warehouse_request) # How do we resume?
send_notification(order.customer_id, shipping_status)
update_analytics(order)
Failure modes:
- Crash after payment but before inventory: Customer charged, no fulfillment
- Restart during shipping wait: Lost context, duplicate notifications
- Network partition: Inconsistent state across services
- Long-running process: Process memory, thread exhaustion
Compensation-based approaches have their own problems:
# Saga pattern - better but complex
def process_order_saga(order):
compensations = []
try:
validate(order)
payment_id = charge_payment(order.customer_id, order.total)
compensations.append(lambda: refund_payment(payment_id))
inventory_id = reserve_inventory(order.items)
compensations.append(lambda: release_inventory(inventory_id))
# Problem: where do we persist compensation state?
# Problem: how do we handle partial compensation failures?
# Problem: how do we wait for async events?
except Exception:
for comp in reversed(compensations):
try:
comp() # What if compensation fails?
except Exception:
# Now we have inconsistent state AND failed compensation
log.error("Compensation failed, manual intervention needed")
Workflow Engine Architecture: Core Concepts
The Durable Execution Model
Workflow engines solve these problems through a paradigm shift: instead of executing code directly, they record a history of events and replay that history to reconstruct state.
┌─────────────────────────────────────────────────────────────────────────┐
│ WORKFLOW EXECUTION MODEL │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ Developer writes: Engine actually does: │
│ ───────────────── ───────────────────── │
│ def workflow(): 1. Start workflow execution │
│ a = activity_1() 2. Schedule activity_1 │
│ b = activity_2(a) • Persist intent to history │
│ c = activity_3() • Worker polls, executes │
│ return combine(b, c) • Result persisted to history │
│ 3. Replay workflow from history │
│ Looks sequential, • activity_1() returns cached result │
│ executes as state 4. Schedule activity_2 (with cached a) │
│ machine with 5. ...continues replay/execute cycle │
│ persistence at 6. All activities complete → workflow done │
│ each step │
│ │
└─────────────────────────────────────────────────────────────────────────┘
Event Sourcing + Deterministic Replay
The core insight: if workflow code is deterministic, you can reconstruct its exact state by replaying its event history.
// Temporal workflow pseudo-code
func OrderWorkflow(ctx workflow.Context, order Order) error {
// First execution: schedules activity, pauses, waits for result
// Replay: skips scheduling, returns cached result immediately
paymentID, err := workflow.ExecuteActivity(ctx, ChargePayment, order).Get(ctx, nil)
if err != nil {
return err
}
// First execution: schedules activity
// Replay: returns cached result
inventoryID, err := workflow.ExecuteActivity(ctx, ReserveInventory, order.Items).Get(ctx, nil)
if err != nil {
// Compensation: still durable, still replayed
workflow.ExecuteActivity(ctx, RefundPayment, paymentID).Get(ctx, nil)
return err
}
// Wait for signal (can be hours/days) - workflow hibernates
var shippingStatus ShippingStatus
workflow.GetSignalChannel(ctx, "shipping").Receive(ctx, &shippingStatus)
// Continue after signal received
workflow.ExecuteActivity(ctx, NotifyCustomer, order, shippingStatus).Get(ctx, nil)
return nil
}
How replay works:
Execution History (persisted in database):
═══════════════════════════════════════════════════════════════
Event 1: WorkflowExecutionStarted { order: {...} }
Event 2: ActivityTaskScheduled { activity: "ChargePayment" }
Event 3: ActivityTaskCompleted { result: "payment_123" }
Event 4: ActivityTaskScheduled { activity: "ReserveInventory" }
Event 5: ActivityTaskCompleted { result: "inventory_456" }
Event 6: SignalReceived { name: "shipping", data: {...} }
Event 7: ActivityTaskScheduled { activity: "NotifyCustomer" }
... (worker crashes here, restarts) ...
Replay:
───────
1. Load events 1-7 from database
2. Execute workflow code:
- workflow.ExecuteActivity(ChargePayment)
→ Event 3 exists, return "payment_123" (no execution)
- workflow.ExecuteActivity(ReserveInventory)
→ Event 5 exists, return "inventory_456" (no execution)
- workflow.GetSignalChannel().Receive()
→ Event 6 exists, return shipping status (no waiting)
- workflow.ExecuteActivity(NotifyCustomer)
→ Event 7 exists (scheduled), wait for completion
3. Continue from where we left off
Architecture Deep Dive
System Components
┌─────────────────────────────────────────────────────────────────────────────┐
│ WORKFLOW ORCHESTRATION ARCHITECTURE │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│ Client Layer │
│ ┌────────────────┐ ┌────────────────┐ ┌────────────────┐ │
│ │ Workflow Client │ │ Signal Sender │ │ Query Handler │ │
│ │ (Start/Cancel) │ │ (Send Events) │ │ (Read State) │ │
│ └───────┬─────────┘ └───────┬────────┘ └───────┬────────┘ │
└──────────┼────────────────────┼───────────────────┼──────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────────┐
│ Frontend Service │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ gRPC API │ │ Rate Limiter │ │ Request │ │
│ │ Gateway │ │ & Auth │ │ Router │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└──────────────────────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ History Service │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Shard Manager (N shards) │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Shard 0 │ │ Shard 1 │ │ Shard 2 │ │ Shard N │ │ │
│ │ │ ─────── │ │ ─────── │ │ ─────── │ │ ─────── │ │ │
│ │ │Workflows│ │Workflows│ │Workflows│ │Workflows│ │ │
│ │ │ Mutable │ │ Mutable │ │ Mutable │ │ Mutable │ │ │
│ │ │ State │ │ State │ │ State │ │ State │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────┐ ┌────────────────────────────────┐ │
│ │ Workflow State │ │ Timer/Schedule Manager │ │
│ │ Machine │ │ (Durable timers, cron) │ │
│ └──────────────────────┘ └────────────────────────────────┘ │
└──────────────────────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ Matching Service │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Task Queues │ │
│ │ ┌─────────────────┐ ┌─────────────────┐ │ │
│ │ │ Workflow Tasks │ │ Activity Tasks │ │ │
│ │ │ ─────────────── │ │ ─────────────── │ │ │
│ │ │ Queue: default │ │ Queue: default │ │ │
│ │ │ Queue: priority │ │ Queue: io-bound │ │ │
│ │ │ Queue: batch │ │ Queue: cpu-bound│ │ │
│ │ └─────────────────┘ └─────────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────────────────┐ │
│ │ Task Dispatch: Long-poll with lease, rate limiting, routing│ │
│ └────────────────────────────────────────────────────────────┘ │
└──────────────────────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ Worker Pool │
│ ┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐ │
│ │ Workflow Worker │ │ Workflow Worker │ │ Workflow Worker │ │
│ │ ──────────────── │ │ ──────────────── │ │ ──────────────── │ │
│ │ - Deterministic │ │ - Deterministic │ │ - Deterministic │ │
│ │ - Replay-safe │ │ - Replay-safe │ │ - Replay-safe │ │
│ │ - No side effects│ │ - No side effects│ │ - No side effects│ │
│ └──────────────────┘ └──────────────────┘ └──────────────────┘ │
│ │
│ ┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐ │
│ │ Activity Worker │ │ Activity Worker │ │ Activity Worker │ │
│ │ ──────────────── │ │ ──────────────── │ │ ──────────────── │ │
│ │ - Side effects OK│ │ - Side effects OK│ │ - Side effects OK│ │
│ │ - HTTP calls │ │ - DB operations │ │ - External APIs │ │
│ │ - File I/O │ │ - Compute tasks │ │ - Any operation │ │
│ └──────────────────┘ └──────────────────┘ └──────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ Persistence Layer │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ Primary Database │ │
│ │ ┌────────────────┐ ┌────────────────┐ ┌────────────────┐ │ │
│ │ │ Executions │ │ History Events │ │ Visibility │ │ │
│ │ │ (workflow │ │ (event sourcing│ │ (search/query) │ │ │
│ │ │ mutable state)│ │ immutable log)│ │ │ │ │
│ │ └────────────────┘ └────────────────┘ └────────────────┘ │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │
│ Supported: PostgreSQL, MySQL, Cassandra, Elasticsearch │
└─────────────────────────────────────────────────────────────────────┘
History Service: The State Machine Engine
The history service maintains workflow state through a carefully designed state machine:
// Workflow execution states
type WorkflowState int
const (
WorkflowStateRunning WorkflowState = iota // Actively executing
WorkflowStateCompleted // Successfully finished
WorkflowStateFailed // Terminated with error
WorkflowStateCanceled // Explicitly canceled
WorkflowStateContinuedAsNew // Restarted with new run ID
WorkflowStateTimedOut // Exceeded timeout
WorkflowStateTerminated // Force terminated
)
// Mutable state: reconstructed from history on each decision task
type MutableState struct {
// Execution info
ExecutionInfo *ExecutionInfo
// Pending activities (scheduled but not completed)
PendingActivities map[int64]*ActivityInfo
// Pending child workflows
PendingChildExecutions map[int64]*ChildExecutionInfo
// Pending timers
PendingTimers map[string]*TimerInfo
// Pending signals
PendingSignals map[int64]*SignalInfo
// Buffered events (received during workflow task execution)
BufferedEvents []*HistoryEvent
// Current event version (for conflict detection)
NextEventID int64
}
Event sourcing internals:
-- History events table structure
CREATE TABLE history_events (
shard_id INT NOT NULL,
namespace_id BINARY(16) NOT NULL,
workflow_id VARCHAR(255) NOT NULL,
run_id BINARY(16) NOT NULL,
event_id BIGINT NOT NULL, -- Sequential within execution
event_type INT NOT NULL, -- Enum: ActivityScheduled, TimerFired, etc.
event_time TIMESTAMP NOT NULL,
version BIGINT NOT NULL, -- For multi-region conflict resolution
data BLOB NOT NULL, -- Protobuf-encoded event
data_encoding VARCHAR(16) NOT NULL, -- proto3, json
PRIMARY KEY (shard_id, namespace_id, workflow_id, run_id, event_id),
INDEX idx_shard_event_time (shard_id, event_time)
);
-- Execution state (mutable, derived from history)
CREATE TABLE executions (
shard_id INT NOT NULL,
namespace_id BINARY(16) NOT NULL,
workflow_id VARCHAR(255) NOT NULL,
run_id BINARY(16) NOT NULL,
state INT NOT NULL,
next_event_id BIGINT NOT NULL,
last_write_version BIGINT NOT NULL,
-- Denormalized for query performance
workflow_type_name VARCHAR(255),
start_time TIMESTAMP,
execution_time TIMESTAMP,
-- Serialized mutable state
execution_state BLOB,
PRIMARY KEY (shard_id, namespace_id, workflow_id, run_id),
INDEX idx_status (shard_id, namespace_id, state),
INDEX idx_workflow_type (shard_id, namespace_id, workflow_type_name)
);
Matching Service: Task Dispatch
The matching service implements sophisticated task routing:
// Task queue implementation
type TaskQueue struct {
sync.RWMutex
name string
queueType TaskQueueType // Workflow or Activity
// Pending tasks waiting for workers
taskBuffer *PriorityBuffer
// Workers polling for tasks (long-poll connections)
pollers []*Poller
// Rate limiting
rateLimiter *RateLimiter
// Sticky queue for workflow affinity
stickyInfo *StickyInfo
}
// Task dispatch algorithm
func (q *TaskQueue) DispatchTask(ctx context.Context, task *Task) error {
q.Lock()
defer q.Unlock()
// Check rate limit
if !q.rateLimiter.Allow() {
return ErrRateLimited
}
// Try to dispatch to waiting poller immediately
for _, poller := range q.pollers {
if poller.TryDispatch(task) {
metrics.TaskDispatchLatency.Observe(time.Since(task.CreatedTime))
return nil
}
}
// No available poller, buffer the task
if err := q.taskBuffer.Push(task); err != nil {
return err // Buffer full, backpressure
}
return nil
}
// Long-poll implementation
func (q *TaskQueue) PollForTask(ctx context.Context, workerID string) (*Task, error) {
poller := &Poller{
workerID: workerID,
taskChan: make(chan *Task, 1),
}
q.Lock()
// First try to get buffered task
if task := q.taskBuffer.Pop(); task != nil {
q.Unlock()
return task, nil
}
// Register as waiting poller
q.pollers = append(q.pollers, poller)
q.Unlock()
defer func() {
q.Lock()
q.removePoller(poller)
q.Unlock()
}()
// Wait for task or timeout (typically 60s)
select {
case task := <-poller.taskChan:
return task, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
Sticky execution for cache efficiency:
Normal Task Dispatch:
────────────────────
Task → Any available worker → Cache miss → Rebuild workflow state from history
Sticky Execution:
─────────────────
Task → Same worker that ran last decision → Cache hit → Fast execution
┌─────────────────────────────────────────────────────────────────────┐
│ STICKY EXECUTION │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Workflow Task 1: │
│ ┌──────────┐ ┌───────────────┐ │
│ │ History │───▶│ Worker A │ Cache workflow context │
│ │ Service │ │ (executes) │ (goroutines, local state) │
│ └──────────┘ └───────────────┘ │
│ │ │
│ ▼ Schedule activity, set sticky queue │
│ ┌───────────────┐ │
│ │ Sticky Queue: │ │
│ │ worker-A-uuid │ │
│ └───────────────┘ │
│ │
│ Workflow Task 2 (after activity completes): │
│ ┌──────────┐ ┌───────────────┐ │
│ │ Matching │───▶│ Worker A │ Cache HIT: no replay needed │
│ │ Service │ │ (same worker) │ Continue from cached state │
│ └──────────┘ └───────────────┘ │
│ │
│ If Worker A unavailable (timeout): │
│ ┌──────────┐ ┌───────────────┐ │
│ │ Matching │───▶│ Worker B │ Cache MISS: full replay │
│ │ Service │ │ (any worker) │ Rebuild state from history │
│ └──────────┘ └───────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
Determinism Requirements and Non-Determinism Detection
Workflow code must be deterministic—given the same history, it must make the same decisions:
// NON-DETERMINISTIC: Different results on replay
func BadWorkflow(ctx workflow.Context) error {
// WRONG: Random values differ between replays
if rand.Intn(100) < 50 {
workflow.ExecuteActivity(ctx, PathA)
} else {
workflow.ExecuteActivity(ctx, PathB)
}
// WRONG: Time differs between replays
if time.Now().Hour() < 12 {
workflow.ExecuteActivity(ctx, MorningActivity)
}
// WRONG: External state may change
result, _ := http.Get("https://api.example.com/status")
if result.StatusCode == 200 {
workflow.ExecuteActivity(ctx, OnlineActivity)
}
return nil
}
// DETERMINISTIC: Same results on replay
func GoodWorkflow(ctx workflow.Context) error {
// RIGHT: Use side effect for random values (recorded in history)
var randomValue int
workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
return rand.Intn(100)
}).Get(&randomValue)
if randomValue < 50 {
workflow.ExecuteActivity(ctx, PathA)
} else {
workflow.ExecuteActivity(ctx, PathB)
}
// RIGHT: Use workflow time (from history)
workflowTime := workflow.Now(ctx)
if workflowTime.Hour() < 12 {
workflow.ExecuteActivity(ctx, MorningActivity)
}
// RIGHT: External calls via activities (recorded in history)
var status int
workflow.ExecuteActivity(ctx, CheckExternalStatus).Get(ctx, &status)
if status == 200 {
workflow.ExecuteActivity(ctx, OnlineActivity)
}
return nil
}
Non-determinism detection:
// Replay detects non-determinism by comparing commands
type ReplayDecisionValidator struct {
historyEvents []*HistoryEvent
currentIndex int
}
func (v *ReplayDecisionValidator) ValidateCommand(cmd *Command) error {
if v.currentIndex >= len(v.historyEvents) {
// New command during replay - non-determinism
return &NonDeterminismError{
Message: "new command generated during replay",
Command: cmd,
}
}
expectedEvent := v.historyEvents[v.currentIndex]
if !commandMatchesEvent(cmd, expectedEvent) {
return &NonDeterminismError{
Message: "command doesn't match history",
Command: cmd,
Expected: expectedEvent,
}
}
v.currentIndex++
return nil
}
Advanced Patterns
Child Workflows and Workflow Composition
// Parent workflow orchestrating multiple child workflows
func OrderFulfillmentWorkflow(ctx workflow.Context, orders []Order) error {
// Execute order workflows in parallel
var futures []workflow.Future
for _, order := range orders {
// Each child workflow is independently durable
future := workflow.ExecuteChildWorkflow(ctx, SingleOrderWorkflow, order)
futures = append(futures, future)
}
// Wait for all with configurable failure policy
results := make([]OrderResult, len(orders))
for i, future := range futures {
if err := future.Get(ctx, &results[i]); err != nil {
// Child failure handling:
// - ParentClosePolicy: TERMINATE, ABANDON, or REQUEST_CANCEL
// - Can continue with partial results
log.Error("Order failed", "orderID", orders[i].ID, "error", err)
}
}
return nil
}
// Continue-as-new: Reset history for long-running workflows
func LongRunningWorkflow(ctx workflow.Context, state State) error {
// Workflows accumulate history events
// After ~10k events, performance degrades
for workflow.GetInfo(ctx).GetCurrentHistoryLength() < 5000 {
// Process batch
state = processNextBatch(ctx, state)
}
// Continue with fresh history
return workflow.NewContinueAsNewError(ctx, LongRunningWorkflow, state)
}
Saga Pattern with Durable Compensation
// Saga with automatic compensation tracking
type SagaState struct {
completedSteps []string
}
func SagaWorkflow(ctx workflow.Context, booking BookingRequest) error {
saga := &SagaState{}
// Flight booking
flightID, err := executeWithCompensation(ctx, saga,
"book_flight",
func() (string, error) {
var id string
err := workflow.ExecuteActivity(ctx, BookFlight, booking.Flight).Get(ctx, &id)
return id, err
},
func(id string) error {
return workflow.ExecuteActivity(ctx, CancelFlight, id).Get(ctx, nil)
},
)
if err != nil {
return compensateAndFail(ctx, saga, err)
}
// Hotel booking
hotelID, err := executeWithCompensation(ctx, saga,
"book_hotel",
func() (string, error) {
var id string
err := workflow.ExecuteActivity(ctx, BookHotel, booking.Hotel).Get(ctx, &id)
return id, err
},
func(id string) error {
return workflow.ExecuteActivity(ctx, CancelHotel, id).Get(ctx, nil)
},
)
if err != nil {
return compensateAndFail(ctx, saga, err)
}
// Car rental
carID, err := executeWithCompensation(ctx, saga,
"rent_car",
func() (string, error) {
var id string
err := workflow.ExecuteActivity(ctx, RentCar, booking.Car).Get(ctx, &id)
return id, err
},
func(id string) error {
return workflow.ExecuteActivity(ctx, CancelCarRental, id).Get(ctx, nil)
},
)
if err != nil {
return compensateAndFail(ctx, saga, err)
}
return nil // All bookings successful
}
func compensateAndFail(ctx workflow.Context, saga *SagaState, originalErr error) error {
// Run compensations in reverse order
for i := len(saga.completedSteps) - 1; i >= 0; i-- {
step := saga.completedSteps[i]
// Compensation activities are also durable
if err := runCompensation(ctx, step); err != nil {
// Log but continue compensating
log.Error("Compensation failed", "step", step, "error", err)
}
}
return originalErr
}
Durable Timers and Scheduled Workflows
// Timer implementation - survives process restarts
func ReminderWorkflow(ctx workflow.Context, userID string) error {
// Timer is persisted as an event
// If worker crashes, timer fires when history is replayed
// Wait 7 days (workflow hibernates, no resources consumed)
if err := workflow.Sleep(ctx, 7*24*time.Hour); err != nil {
return err
}
// Send reminder
return workflow.ExecuteActivity(ctx, SendReminder, userID).Get(ctx, nil)
}
// Cron workflow - scheduled execution
func DailyReportWorkflow(ctx workflow.Context) error {
// Process daily report
report := generateReport(ctx)
// Send report
workflow.ExecuteActivity(ctx, SendReport, report).Get(ctx, nil)
// Continue-as-new for next scheduled run
// Cron schedule managed by engine, not workflow code
return nil
}
// Client-side cron setup
client.ExecuteWorkflow(ctx, client.StartWorkflowOptions{
ID: "daily-report",
TaskQueue: "reports",
CronSchedule: "0 9 * * *", // 9 AM daily
}, DailyReportWorkflow)
Multi-Region and Consistency
Cross-Region Replication
┌─────────────────────────────────────────────────────────────────────────────┐
│ MULTI-REGION WORKFLOW REPLICATION │
└─────────────────────────────────────────────────────────────────────────────┘
Region: US-EAST (Active) Region: EU-WEST (Standby)
┌────────────────────────────┐ ┌────────────────────────────┐
│ │ │ │
│ ┌────────────────────┐ │ │ ┌────────────────────┐ │
│ │ Frontend Service │ │ │ │ Frontend Service │ │
│ └────────┬───────────┘ │ │ └────────┬───────────┘ │
│ │ │ │ │ │
│ ▼ │ │ ▼ │
│ ┌────────────────────┐ │ │ ┌────────────────────┐ │
│ │ History Service │ │ │ │ History Service │ │
│ │ (accepts writes) │ │ │ │ (read-only) │ │
│ └────────┬───────────┘ │ │ └────────┬───────────┘ │
│ │ │ │ ▲ │
│ ▼ │ │ │ │
│ ┌────────────────────┐ │ Async │ ┌────────────────────┐ │
│ │ Primary Database │────┼──────────┼───▶│ Replica Database │ │
│ │ │ │Replication│ │ │ │
│ └────────────────────┘ │ │ └────────────────────┘ │
│ │ │ │
└────────────────────────────┘ └────────────────────────────┘
Conflict Resolution: Version vectors per event
─────────────────────────────────────────────
Event arrives at both regions simultaneously:
- Each region assigns version from its version clock
- On merge, higher version wins (last-writer-wins)
- For same version, compare region priority
Failover Process:
────────────────
1. Detect primary region failure (health checks)
2. Promote standby to active (DNS update)
3. New writes go to EU-WEST
4. When US-EAST recovers, it becomes standby
5. Reconcile any divergent history (rare with proper fencing)
Consistency Guarantees
// Workflow visibility consistency
type VisibilityConsistency int
const (
// Eventual consistency (default) - uses search index
EventuallyConsistent VisibilityConsistency = iota
// Strong consistency - reads from primary database
StronglyConsistent
)
// Query workflows with consistency choice
results, err := client.ListWorkflow(ctx, &workflowservice.ListWorkflowExecutionsRequest{
Namespace: "default",
Query: `WorkflowType = "OrderWorkflow" AND Status = "Running"`,
// Note: eventual consistency for list queries
})
// Get single workflow with strong consistency
execution, err := client.DescribeWorkflowExecution(ctx, workflowID, runID)
// Always reads from primary database, strongly consistent
Why This Matters in Production
Real-World Use Cases
1. Financial Transaction Processing:
// Money transfer with guaranteed completion
func TransferWorkflow(ctx workflow.Context, from, to string, amount Money) error {
// Debit is durable - won't lose money
err := workflow.ExecuteActivity(ctx, Debit, from, amount).Get(ctx, nil)
if err != nil {
return err // No compensation needed
}
// Credit is durable - retries automatically
for i := 0; i < 3; i++ {
err = workflow.ExecuteActivity(ctx, Credit, to, amount).Get(ctx, nil)
if err == nil {
return nil // Success
}
workflow.Sleep(ctx, time.Duration(i+1)*time.Minute) // Backoff
}
// Refund on permanent failure
workflow.ExecuteActivity(ctx, Credit, from, amount).Get(ctx, nil)
return fmt.Errorf("transfer failed after retries")
}
2. ML Pipeline Orchestration:
func MLTrainingPipeline(ctx workflow.Context, config TrainingConfig) error {
// Data preparation (hours)
preparedData := workflow.ExecuteActivity(ctx, PrepareDataset, config).Get(ctx, nil)
// Hyperparameter search (days, multiple child workflows)
var bestParams HyperParams
for _, params := range config.ParamGrid {
result := workflow.ExecuteChildWorkflow(ctx, TrainModel, params, preparedData).Get(ctx, nil)
if result.Accuracy > bestParams.Accuracy {
bestParams = result.Params
}
}
// Final training with best params
model := workflow.ExecuteActivity(ctx, TrainFinalModel, bestParams).Get(ctx, nil)
// Deploy (with human approval signal)
var approved bool
workflow.GetSignalChannel(ctx, "deploy-approval").Receive(ctx, &approved)
if approved {
workflow.ExecuteActivity(ctx, DeployModel, model).Get(ctx, nil)
}
return nil
}
3. Customer Onboarding Workflow:
func CustomerOnboarding(ctx workflow.Context, customer Customer) error {
// Create account
workflow.ExecuteActivity(ctx, CreateAccount, customer).Get(ctx, nil)
// Send welcome email and wait for verification
workflow.ExecuteActivity(ctx, SendWelcomeEmail, customer.Email).Get(ctx, nil)
// Wait up to 7 days for email verification
verified := false
selector := workflow.NewSelector(ctx)
selector.AddReceive(workflow.GetSignalChannel(ctx, "email-verified"), func(c workflow.ReceiveChannel, more bool) {
verified = true
})
selector.AddFuture(workflow.NewTimer(ctx, 7*24*time.Hour), func(f workflow.Future) {
// Timeout, not verified
})
selector.Select(ctx)
if !verified {
workflow.ExecuteActivity(ctx, SendReminderEmail, customer.Email).Get(ctx, nil)
// Wait another 3 days
selector = workflow.NewSelector(ctx)
selector.AddReceive(workflow.GetSignalChannel(ctx, "email-verified"), func(c workflow.ReceiveChannel, more bool) {
verified = true
})
selector.AddFuture(workflow.NewTimer(ctx, 3*24*time.Hour), func(f workflow.Future) {})
selector.Select(ctx)
}
if !verified {
workflow.ExecuteActivity(ctx, MarkInactiveAccount, customer.ID).Get(ctx, nil)
return fmt.Errorf("email not verified within 10 days")
}
// Complete onboarding
workflow.ExecuteActivity(ctx, ProvisionResources, customer.ID).Get(ctx, nil)
workflow.ExecuteActivity(ctx, NotifySuccess, customer.ID).Get(ctx, nil)
return nil
}
Operational Considerations
1. History Size Management:
- Workflows accumulate events over time
- Performance degrades around 10,000+ events
- Use
ContinueAsNewfor long-running workflows - Archive completed workflow histories
2. Task Queue Scaling:
- Each task queue is a unit of scaling
- Separate queues for different latency requirements
- Rate limit at queue level for downstream protection
3. Worker Resource Planning:
Workflow Workers:
- Low CPU (mostly waiting for activities)
- Memory proportional to concurrent workflow executions
- Cache workflow state for sticky execution
Activity Workers:
- Resource requirements depend on activity nature
- Separate pools for CPU-bound vs I/O-bound
- Horizontal scaling based on activity latency
4. Monitoring and Observability:
// Key metrics to track
metrics.WorkflowStarted.Inc()
metrics.WorkflowCompleted.Inc()
metrics.WorkflowFailed.Inc()
metrics.ActivityLatency.Observe(latency)
metrics.WorkflowTaskLatency.Observe(latency)
metrics.TaskQueueDepth.Set(depth)
metrics.HistorySize.Observe(eventCount)
Comparison: Temporal vs AWS Step Functions vs Prefect
| Aspect | Temporal | AWS Step Functions | Prefect |
|---|---|---|---|
| Execution Model | Deterministic replay | State machine JSON | Task graph |
| Code Style | Business logic in code | JSON/YAML definition | Python decorators |
| Hosting | Self-hosted or Cloud | Fully managed | Self-hosted or Cloud |
| Complexity Handling | Arbitrary code branching | Limited by JSON DSL | Python control flow |
| Long-Running | Years (history pagination) | Standard: 1 year, Express: 5 min | Task-based, not workflow |
| Pricing | Infrastructure cost | Per state transition | Infrastructure cost |
| Local Dev | Full local dev support | Limited (Step Functions Local) | Full local support |
| Use Case Fit | Complex business logic | Simple state machines | Data/ML pipelines |
Workflow orchestration engines fundamentally change how we approach reliability in distributed systems. Instead of bolting retry logic, state persistence, and failure handling onto existing code, they provide a programming model where durability is the default. The complexity shifts from error handling in business logic to understanding determinism constraints and event sourcing semantics—a worthwhile trade for systems that need to survive infrastructure failures across hours, days, or weeks of execution.
What did you think?