Durable Task Python Patterns
January 30, 2026 ยท View on GitHub
Detailed implementation patterns for the Durable Task Python SDK.
Function Chaining
Sequential execution where each step depends on the previous:
def hello(ctx: task.ActivityContext, name: str) -> str:
"""Activity function that returns a greeting"""
return f'Hello {name}!'
def chained_workflow(ctx: task.OrchestrationContext, _):
"""Orchestrator that chains activity calls sequentially"""
result1 = yield ctx.call_activity(hello, input='Tokyo')
result2 = yield ctx.call_activity(hello, input='Seattle')
result3 = yield ctx.call_activity(hello, input='London')
return [result1, result2, result3]
# Registration
worker.add_orchestrator(chained_workflow)
worker.add_activity(hello)
Fan-Out/Fan-In
Parallel processing with aggregated results:
import random
from durabletask import task
def get_work_items(ctx: task.ActivityContext, _) -> list[str]:
"""Activity that returns a list of work items"""
count = random.randint(2, 10)
return [f'work item {i}' for i in range(count)]
def process_work_item(ctx: task.ActivityContext, item: str) -> int:
"""Activity that processes a single work item"""
return random.randint(0, 10)
def fanout_fanin_workflow(ctx: task.OrchestrationContext, _):
"""Orchestrator that fans out work and fans in results"""
# Get work items
work_items: list[str] = yield ctx.call_activity(get_work_items)
# Fan-out: schedule all work items in parallel
tasks = [ctx.call_activity(process_work_item, input=item) for item in work_items]
# Fan-in: wait for all to complete
results: list[int] = yield task.when_all(tasks)
# Return aggregated results
return {
'work_items': work_items,
'results': results,
'total': sum(results)
}
# Registration
worker.add_orchestrator(fanout_fanin_workflow)
worker.add_activity(get_work_items)
worker.add_activity(process_work_item)
Batched Fan-Out (Large Scale)
For large numbers of items, process in batches:
def batched_fanout_workflow(ctx: task.OrchestrationContext, _):
"""Process work items in batches to control parallelism"""
work_items = yield ctx.call_activity(get_work_items)
batch_size = 10
all_results = []
for i in range(0, len(work_items), batch_size):
batch = work_items[i:i + batch_size]
tasks = [ctx.call_activity(process_work_item, input=item) for item in batch]
batch_results = yield task.when_all(tasks)
all_results.extend(batch_results)
return {'total': sum(all_results)}
Human Interaction
Workflow that waits for external approval with timeout:
from collections import namedtuple
from dataclasses import dataclass
from datetime import timedelta
from durabletask import task
@dataclass
class Order:
cost: float
product: str
quantity: int
def send_approval_request(ctx: task.ActivityContext, order: Order) -> None:
"""Send notification requesting approval"""
print(f"Approval needed for {order.product} (${order.cost})")
def place_order(ctx: task.ActivityContext, order: Order) -> str:
"""Place the order after approval"""
return f"Order placed: {order.quantity}x {order.product}"
def purchase_order_workflow(ctx: task.OrchestrationContext, order: Order):
"""Orchestrator that implements human approval pattern"""
# Auto-approve small orders
if order.cost < 1000:
return "Auto-approved"
# Request approval for larger orders
yield ctx.call_activity(send_approval_request, input=order)
# Wait for approval OR timeout (whichever comes first)
approval_event = ctx.wait_for_external_event("approval_received")
timeout_event = ctx.create_timer(timedelta(hours=24))
winner = yield task.when_any([approval_event, timeout_event])
if winner == timeout_event:
return "Canceled - approval timeout"
# Order was approved
yield ctx.call_activity(place_order, input=order)
approval_details = approval_event.get_result()
return f"Approved by '{approval_details.approver}'"
# Raising the approval event from client
Approval = namedtuple("Approval", ["approver"])
approval_data = Approval("manager@company.com")
client.raise_orchestration_event(instance_id, "approval_received", data=approval_data)
Durable Timers
Schedule delayed execution:
from datetime import timedelta
def delayed_workflow(ctx: task.OrchestrationContext, _):
"""Orchestrator that waits before continuing"""
yield ctx.call_activity(start_activity)
# Wait for 5 minutes (survives restarts)
yield ctx.create_timer(timedelta(minutes=5))
yield ctx.call_activity(continue_activity)
return "Done"
Scheduled Execution
Execute at a specific time:
from datetime import datetime, timedelta
def scheduled_workflow(ctx: task.OrchestrationContext, scheduled_time: datetime):
"""Execute activity at a specific scheduled time"""
# Calculate delay from current orchestration time
delay = scheduled_time - ctx.current_utc_datetime
if delay > timedelta(0):
yield ctx.create_timer(delay)
result = yield ctx.call_activity(scheduled_activity)
return result
Sub-Orchestrations
Compose orchestrations from smaller pieces:
def child_orchestration(ctx: task.OrchestrationContext, data: str):
"""Child orchestration that can be called from parents"""
result1 = yield ctx.call_activity(activity_a, input=data)
result2 = yield ctx.call_activity(activity_b, input=result1)
return result2
def parent_orchestration(ctx: task.OrchestrationContext, items: list[str]):
"""Parent orchestration that calls child orchestrations"""
# Call child orchestrations in parallel
tasks = [
ctx.call_sub_orchestrator(child_orchestration, input=item)
for item in items
]
results = yield task.when_all(tasks)
return results
# Registration - both must be registered
worker.add_orchestrator(parent_orchestration)
worker.add_orchestrator(child_orchestration)
worker.add_activity(activity_a)
worker.add_activity(activity_b)
Durable Entities
Stateful objects with operations:
Function-Based Entity
from durabletask import entities
def counter(ctx: entities.EntityContext, input: int):
"""Function-based entity for a counter"""
state = ctx.get_state(int, 0) # Get state with default 0
if ctx.operation == "add":
state += input
ctx.set_state(state)
elif ctx.operation == "subtract":
state -= input
ctx.set_state(state)
elif ctx.operation == "get":
return state
elif ctx.operation == "reset":
ctx.set_state(0)
Class-Based Entity
from durabletask import entities
class Counter(entities.DurableEntity):
"""Class-based entity for a counter"""
def __init__(self):
self.set_state(0)
def add(self, amount: int):
current = self.get_state(int, 0)
self.set_state(current + amount)
def subtract(self, amount: int):
current = self.get_state(int, 0)
self.set_state(current - amount)
def get(self) -> int:
return self.get_state(int, 0)
def reset(self):
self.set_state(0)
Using Entities from Orchestrations
def workflow_with_entity(ctx: task.OrchestrationContext, _):
"""Orchestration that interacts with entities"""
entity_id = entities.EntityInstanceId("counter", "my-counter")
# Signal entity (fire-and-forget) - uses entity_id and operation_name params
ctx.signal_entity(entity_id=entity_id, operation_name="add", input=5)
# Call entity and wait for result - uses entity and operation params (different from signal!)
value = yield ctx.call_entity(entity=entity_id, operation="get")
return f"Counter value: {value}"
Using Entities from Client
entity_id = entities.EntityInstanceId("counter", "my-counter")
client.signal_entity(entity_id, "add", input=10)
Entity Locking
Ensure exclusive access to multiple entities:
def workflow_with_locks(ctx: task.OrchestrationContext, _):
"""Lock multiple entities for atomic operations"""
entity_id_1 = entities.EntityInstanceId("account", "account-1")
entity_id_2 = entities.EntityInstanceId("account", "account-2")
# Lock entities for exclusive access
with (yield ctx.lock_entities([entity_id_1, entity_id_2])):
# Perform atomic operations on both entities
# Note: call_entity uses 'entity' and 'operation' params
balance1 = yield ctx.call_entity(entity=entity_id_1, operation="get_balance")
balance2 = yield ctx.call_entity(entity=entity_id_2, operation="get_balance")
# Transfer between accounts
yield ctx.call_entity(entity=entity_id_1, operation="withdraw", input=100)
yield ctx.call_entity(entity=entity_id_2, operation="deposit", input=100)
return "Transfer complete"
Eternal Orchestrations (Continue-As-New)
Long-running processes that periodically restart:
from datetime import timedelta
def eternal_orchestration(ctx: task.OrchestrationContext, iteration: int):
"""Orchestration that runs forever, restarting periodically"""
# Do periodic work
yield ctx.call_activity(periodic_work, input=iteration)
# Wait before next iteration
yield ctx.create_timer(timedelta(minutes=5))
# Restart with new iteration count
# This prevents history from growing unbounded
ctx.continue_as_new(iteration + 1)
# Start with iteration 0
client.schedule_new_orchestration(eternal_orchestration, input=0)
Monitoring Pattern
Periodic polling with flexible exit conditions:
from datetime import timedelta
def monitoring_workflow(ctx: task.OrchestrationContext, job_id: str):
"""Monitor a job until completion or timeout"""
max_attempts = 10
polling_interval = timedelta(seconds=30)
for attempt in range(max_attempts):
status = yield ctx.call_activity(check_job_status, input=job_id)
if status == "completed":
return {"status": "success", "attempts": attempt + 1}
if status == "failed":
return {"status": "failed", "attempts": attempt + 1}
# Wait before next poll
yield ctx.create_timer(polling_interval)
return {"status": "timeout", "attempts": max_attempts}
Version-Aware Orchestration
Handle breaking changes gracefully using orchestration versioning. Version is set when scheduling the orchestration and read via ctx.version.
Setting Version When Scheduling
# Schedule orchestration with a specific version
instance_id = client.schedule_new_orchestration(
"versioned_orchestration",
input="data",
version="2.0.0" # Version is set here
)
Version Comparison Helper
Use the packaging module for semantic version comparison:
from packaging import version
def compare_version(v1: str | None, v2: str) -> int:
"""Compare two version strings.
Returns: -1 if v1 < v2, 0 if v1 == v2, 1 if v1 > v2
"""
if v1 is None:
return -1
try:
ver1 = version.parse(v1)
ver2 = version.parse(v2)
if ver1 < ver2:
return -1
elif ver1 > ver2:
return 1
return 0
except Exception:
# Fall back to string comparison
return (v1 > v2) - (v1 < v2)
Versioned Orchestration Example
def versioned_orchestration(ctx: task.OrchestrationContext, name: str):
"""Orchestrator that handles multiple versions.
Version history:
- v1.0.0: Basic hello greeting
- v2.0.0: Added goodbye greeting
"""
results = []
orch_version = ctx.version # Read version set during scheduling
# v1.0.0+: Always run this
hello = yield ctx.call_activity(say_hello, input=name)
results.append(hello)
# v2.0.0+: Added in version 2
if compare_version(orch_version, "2.0.0") >= 0:
goodbye = yield ctx.call_activity(say_goodbye, input=name)
results.append(goodbye)
return {"version": orch_version, "results": results}
Why Versioning Matters
Without versioning, changing orchestration logic causes non-deterministic errors:
- v1 orchestration starts (calls activity A, then B)
- You deploy v2 code (calls A, C, then B)
- v1 orchestration replays but hits new code path
- ERROR: History doesn't match - expected B, got C
With versioning:
- v1 orchestrations continue using v1 code path
- v2 orchestrations use v2 code path
- Both run on the same worker without conflict