Duroxide Orchestration Writing Guide
June 8, 2026 · View on GitHub
For: LLMs and humans building durable workflows with Duroxide
Reference: See tests/e2e_samples.rs and examples/ for complete working examples
Table of Contents
- Quick Start
- Core Concepts
- Determinism Rules
- API Reference
- Client API Reference
- Common Patterns
- Anti-Patterns (What to Avoid)
- Complete Examples
- Orchestration Versioning
- Testing Your Orchestrations
Quick Start
Minimum Viable Orchestration
use duroxide::{ActivityContext, OrchestrationContext, OrchestrationRegistry, Client};
use duroxide::runtime::{self, registry::ActivityRegistry};
use duroxide::providers::sqlite::SqliteProvider;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 1. Create storage
let store = Arc::new(SqliteProvider::new("sqlite::memory:", None).await?);
// 2. Register activities (side-effecting work)
let activities = ActivityRegistry::builder()
.register("Greet", |ctx: ActivityContext, name: String| async move {
Ok(format!("Hello, {}!", name))
})
.build();
// 3. Define orchestration (deterministic coordination)
let orchestration = |ctx: OrchestrationContext, name: String| async move {
let greeting = ctx.schedule_activity("Greet", name).await?;
Ok(greeting)
};
// 4. Register and start runtime
let orchestrations = OrchestrationRegistry::builder()
.register("HelloWorld", orchestration)
.build();
let rt = runtime::Runtime::start_with_store(
store.clone(),
activities,
orchestrations
).await;
// 5. Start an instance
let client = Client::new(store);
client.start_orchestration("my-instance-1", "HelloWorld", "World").await?;
// 6. Wait for completion
let status = client.wait_for_orchestration(
"my-instance-1",
std::time::Duration::from_secs(5)
).await?;
println!("Result: {:?}", status);
rt.shutdown(None).await;
Ok(())
}
Core Concepts
Orchestrations vs Activities
Orchestrations (deterministic coordination logic):
- Written as async functions:
|ctx: OrchestrationContext, input: T| async move { ... } - Must be deterministic (same input → same decisions)
- Only schedule work, don't do work themselves
- Can use: if/else, loops, match, any Rust control flow
- CANNOT use: random, time, I/O, non-deterministic APIs
- Purpose: Coordinate multi-step workflows, contain business logic
Activities (single-purpose execution units):
- Stateless async functions:
|ctx: ActivityContext, input: T| async move { ... } - Can do anything: database, HTTP, file I/O, polling, VM provisioning, etc.
- CAN sleep/poll internally as part of their work (e.g., waiting for VM to provision)
- Should be idempotent (tolerate retries)
- Should be single-purpose (do ONE thing well)
- Executed by worker dispatcher (can fail and retry)
- Return
Result<String, String>(success or error) - First parameter
ActivityContextprovides logging and metadata access
Key Principle: Separation of Concerns
- Activities = What to do (execution)
- Orchestrations = When and how to do it (coordination)
Mental Model:
- Orchestration = conductor (coordinates the orchestra, decides when each musician plays)
- Activity = musician (actually plays the music, focuses on playing well)
- Pull multi-step logic UP to orchestrations, keep activities focused
Example:
// ✅ GOOD: Activity polls for VM readiness (single-purpose: provision VM)
// Note: Long-running activities (minutes to hours) are fully supported via automatic lock renewal
activities.register("ProvisionVM", |ctx: ActivityContext, config| async move {
let vm = create_vm(config).await?;
while !vm_ready(&vm).await {
tokio::time::sleep(Duration::from_secs(5)).await; // ✅ Internal polling is fine
}
Ok(vm.id)
});
// ✅ GOOD: Orchestration coordinates multiple activities
async fn deploy_app(ctx: OrchestrationContext, config: String) -> Result<String, String> {
let vm_id = ctx.schedule_activity("ProvisionVM", config.clone()).await?;
let app_id = ctx.schedule_activity("DeployApp", vm_id).await?;
let health = ctx.schedule_activity("HealthCheck", app_id.clone()).await?;
if health != "healthy" {
// ✅ Business logic in orchestration
ctx.schedule_activity("Rollback", app_id).await?;
return Err("Deployment unhealthy".to_string());
}
Ok(app_id)
}
Cooperative Activity Cancellation
When an orchestration is cancelled or reaches a terminal state, in-flight activities receive a cancellation signal via ActivityContext. Activities can choose how to respond:
Cooperative Activities (Recommended)
Activities should check for cancellation during long-running work:
use duroxide::ActivityContext;
activities.register("LongRunningTask", |ctx: ActivityContext, input: String| async move {
for i in 0..100 {
// ✅ BEST PRACTICE: Check cancellation before each unit of work
if ctx.is_cancelled() {
ctx.trace_info("Received cancellation, cleaning up...".to_string());
cleanup_partial_work().await;
return Err("Cancelled".to_string());
}
do_work_chunk(i).await;
}
Ok("completed".to_string())
});
Using select! for Cancellation-Aware I/O
For activities that wait on external resources, use select! with ctx.cancelled():
activities.register("WaitForResource", |ctx: ActivityContext, resource_id: String| async move {
tokio::select! {
// Wait for cancellation signal
_ = ctx.cancelled() => {
ctx.trace_info("Cancelled while waiting for resource".to_string());
Err("Cancelled".to_string())
}
// Or complete the actual work
result = fetch_resource(&resource_id) => {
result.map_err(|e| e.to_string())
}
}
});
Non-Cooperative Activities
If an activity ignores the cancellation signal:
- The runtime waits for a configurable grace period (default: 30 seconds)
- After the grace period, the activity is forcibly aborted
- The activity's result (success or failure) is dropped since the orchestration is already terminal
Best Practices for Long-Running Activities
// ✅ GOOD: Polling activity with cancellation support
activities.register("ProvisionVM", |ctx: ActivityContext, config: String| async move {
let vm = create_vm(&config).await?;
// Poll for readiness with cancellation check
loop {
if ctx.is_cancelled() {
ctx.trace_info("Cancelling VM provisioning".to_string());
cleanup_vm(&vm.id).await;
return Err("Cancelled during provisioning".to_string());
}
if vm_ready(&vm.id).await {
return Ok(vm.id);
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
});
// ✅ GOOD: HTTP client with cancellation-aware timeout
activities.register("CallExternalAPI", |ctx: ActivityContext, request: String| async move {
let client = reqwest::Client::new();
tokio::select! {
_ = ctx.cancelled() => {
Err("Request cancelled".to_string())
}
response = client.post("https://api.example.com")
.body(request)
.timeout(Duration::from_secs(30))
.send() => {
response
.map_err(|e| e.to_string())?
.text()
.await
.map_err(|e| e.to_string())
}
}
});
ActivityContext Cancellation API
| Method | Description |
|---|---|
ctx.is_cancelled() | Returns true if cancellation was requested |
ctx.cancelled() | Returns a Future that completes when cancelled (use in select!) |
The Replay Model
Why replay? Orchestrations can run for hours/days/months and must survive crashes.
How it works:
- First execution: Orchestration runs, schedules activities, decisions recorded as events
- After crash/restart: Orchestration code runs again, but replays from history
- Awaits resolve from history: No duplicate side effects, fast forward to where we left off
Example:
async fn my_orchestration(ctx: OrchestrationContext, input: String) -> Result<String, String> {
// Turn 1 (first execution):
let result = ctx.schedule_activity("Step1", input).await?;
// ↑ Schedules Step1, waits for completion
// [CRASH happens here]
// Turn 2 (after restart - REPLAY):
let result = ctx.schedule_activity("Step1", input).await?;
// ↑ Doesn't re-schedule! Reads Step1 completion from history
let result2 = ctx.schedule_activity("Step2", result).await?;
// ↑ Schedules Step2 for FIRST time (not in history yet)
Ok(result2)
}
Scheduling Work
All schedule_* methods return futures that can be awaited directly:
// Activities return Result<String, String>
let result = ctx.schedule_activity("Task", "input").await?;
// Timers return ()
ctx.schedule_timer(Duration::from_secs(5)).await;
// External events return String
let data = ctx.schedule_wait("EventName").await;
// Sub-orchestrations return Result<String, String>
let child_result = ctx.schedule_sub_orchestration("Child", "input").await?;
Determinism Rules
✅ DO (Safe in Orchestrations)
async fn safe_orchestration(ctx: OrchestrationContext, count: i32) -> Result<String, String> {
// ✅ Control flow
if count > 10 {
return Ok("too many".to_string());
}
// ✅ Loops
for i in 0..count {
ctx.schedule_activity("Process", i.to_string()).await?;
}
// ✅ Pattern matching
let status = ctx.schedule_activity("GetStatus", "").await?;
match status.as_str() {
"ready" => { /* ... */ }
"pending" => { /* ... */ }
_ => { /* ... */ }
}
// ✅ Timers for delays
ctx.schedule_timer(std::time::Duration::from_secs(5)).await;
// ✅ External events for signals
let approval = ctx.schedule_wait("ApprovalEvent").await;
// ✅ Logging (replay-safe)
ctx.trace_info("Step completed");
// ✅ Replay-safe GUIDs
let id = ctx.new_guid().await?;
// ✅ Deterministic timestamps
let now = ctx.utc_now().await?;
let now_ms = now
.duration_since(std::time::UNIX_EPOCH)
.map_err(|e| e.to_string())?
.as_millis() as u64;
// ✅ KV store (durable state)
ctx.set_kv_value("progress", "step_3");
let val = ctx.get_kv_value("progress");
Ok("done".to_string())
}
❌ DON'T (Non-Deterministic)
async fn unsafe_orchestration(ctx: OrchestrationContext, _input: String) -> Result<String, String> {
// ❌ Random numbers
let random = rand::random::<u64>(); // Different on each replay!
// ❌ System time
let now = std::time::SystemTime::now(); // Different on each replay!
// ❌ I/O in orchestration
let file_contents = std::fs::read_to_string("data.txt")?; // Side effect!
// ❌ HTTP in orchestration
let response = reqwest::get("https://api.example.com").await?; // Side effect!
// ❌ Sleep in orchestration
tokio::time::sleep(std::time::Duration::from_secs(5)).await; // Use ctx.schedule_timer() instead!
Ok("unsafe".to_string())
}
Instead, use:
async fn safe_version(ctx: OrchestrationContext, _input: String) -> Result<String, String> {
// ✅ Random → Use ctx.new_guid() or activity
let id = ctx.new_guid().await?;
// ✅ Time → Use ctx.utc_now()
let now = ctx.utc_now().await?;
// ✅ I/O → Use activities
let file_contents = ctx.schedule_activity("ReadFile", "data.txt").await?;
// ✅ HTTP → Use activities
let response = ctx.schedule_activity("HttpGet", "https://api.example.com").await?;
// ✅ Sleep → Use timers
ctx.schedule_timer(std::time::Duration::from_secs(5)).await;
// ✅ Activity
let result = ctx.schedule_activity("Task", "input").await?;
Ok(result)
}
API Reference
OrchestrationContext Methods
Scheduling Activities
// String I/O (basic)
fn schedule_activity(&self, name: impl Into<String>, input: impl Into<String>)
-> impl Future<Output = Result<String, String>>
// Typed I/O (with serde)
fn schedule_activity_typed<In: Serialize, Out: DeserializeOwned>(
&self,
name: impl Into<String>,
input: &In
) -> impl Future<Output = Result<Out, String>>
// Usage:
let result = ctx.schedule_activity("ProcessOrder", order_json).await?;
let result: OrderResult = ctx.schedule_activity_typed("ProcessOrder", &order_data).await?;
Scheduling Activities with Session Affinity
Route activities to the worker process that owns the given session. All activities
with the same session_id are dispatched to the same worker for as long as the
session is owned. This enables in-memory state reuse across activity invocations.
// String I/O
fn schedule_activity_on_session(
&self,
name: impl Into<String>,
input: impl Into<String>,
session_id: impl Into<String>,
) -> DurableFuture<Result<String, String>>
// Typed I/O (with serde)
fn schedule_activity_on_session_typed<In: Serialize, Out: DeserializeOwned>(
&self,
name: impl Into<String>,
input: &In,
session_id: impl Into<String>,
) -> impl Future<Output = Result<Out, String>>
// Usage:
let session_id = ctx.new_guid().await?; // Random, replay-safe
let result = ctx.schedule_activity_on_session("RunTurn", &input, &session_id).await?;
Key properties of sessions:
- Pure routing — no ordering guarantees, no transactional semantics
- Re-claimable — if the worker dies, another worker can claim the session after lock expiry
- Implicit lifecycle — no create/close APIs; sessions are created on first use, cleaned up when idle
- Independent of activity locks — an in-flight activity can still complete even if the session was reassigned (like an in-flight network packet surviving a flow table expiry)
See the Session Affinity Pattern section below for usage patterns.
Activity Tags (Worker Specialization)
Tags route activities to specialized worker pools. An orchestration marks an
activity with a tag using .with_tag(), and the worker runtime filters its
queue to only dequeue matching tags.
// Tag an activity for a GPU worker pool
let result = ctx.schedule_activity("RenderFrame", input)
.with_tag("gpu")
.await?;
// Tags work with typed activities too
let result: RenderResult = ctx.schedule_activity_typed("RenderFrame", &input)
.with_tag("gpu")
.await?;
// Tags can be combined with sessions
let result = ctx.schedule_activity_on_session("CacheLookup", input, &session_id)
.with_tag("cache-node")
.await?;
Worker-side configuration:
use duroxide::{RuntimeOptions, TagFilter};
// Worker that only processes GPU-tagged activities
let opts = RuntimeOptions {
worker_tag_filter: TagFilter::tags(["gpu"]),
..Default::default()
};
// Worker that processes untagged + GPU-tagged activities
let opts = RuntimeOptions {
worker_tag_filter: TagFilter::default_and(["gpu"]),
..Default::default()
};
TagFilter variants:
| Variant | Behavior |
|---|---|
TagFilter::DefaultOnly (default) | Only untagged activities |
TagFilter::tags(["a", "b"]) | Only activities tagged "a" or "b" |
TagFilter::default_and(["a"]) | Untagged or tagged "a" |
TagFilter::Any | All activities regardless of tag (generalist worker) |
TagFilter::None | Never matches (no activities fetched) |
Key properties:
- Tags are persisted in event history — replays see the exact same tag
.with_tag()is a mutate-after-emit pattern that updates the already-emitted action- Tags are purely a routing concern — they don't affect activity behavior
- Tags are per-activity, not per-orchestration — different activities in the same orchestration can have different tags
- Maximum 5 distinct tags per
TagFilter(enforced at construction)
⚠️ Starvation warning: If you tag an activity "gpu" but no running worker has a matching TagFilter, the activity sits in the queue indefinitely — no timeout, no error. Protect against this with a select2 timeout:
let activity = ctx.schedule_activity("GpuRender", input).with_tag("gpu");
let timeout = ctx.schedule_timer(Duration::from_secs(30));
match ctx.select2(activity, timeout).await {
Either2::First(Ok(result)) => Ok(result),
Either2::First(Err(e)) => Err(e),
Either2::Second(()) => Err("No GPU worker available".to_string()),
}
⚠️ Replay determinism: Changing, adding, or removing a .with_tag() call in orchestration code will cause a nondeterminism error on replay of any in-flight instance whose history was recorded with the old tag. This follows the same rules as renaming activities — the tag is part of the event identity.
Scheduling Activities with Retry
// With retry policy (string I/O)
async fn schedule_activity_with_retry(
&self,
name: impl Into<String>,
input: impl Into<String>,
policy: RetryPolicy,
) -> Result<String, String>
// Typed variant
async fn schedule_activity_with_retry_typed<In, Out>(
&self,
name: impl Into<String>,
input: &In,
policy: RetryPolicy,
) -> Result<Out, String>
// Usage:
use duroxide::{RetryPolicy, BackoffStrategy};
// Simple: 3 attempts with default exponential backoff
let result = ctx.schedule_activity_with_retry("Task", input, RetryPolicy::new(3)).await?;
// Custom: 5 attempts, fixed 1s backoff, 30s per-attempt timeout
let policy = RetryPolicy::new(5)
.with_timeout(Duration::from_secs(30))
.with_backoff(BackoffStrategy::Fixed { delay: Duration::from_secs(1) });
let result = ctx.schedule_activity_with_retry("Task", input, policy).await?;
Scheduling Activities with Retry on Session
Combines retry semantics with session affinity — all retry attempts are pinned to the same worker session.
// String I/O
async fn schedule_activity_with_retry_on_session(
&self,
name: impl Into<String>,
input: impl Into<String>,
policy: RetryPolicy,
session_id: impl Into<String>,
) -> Result<String, String>
// Typed variant
async fn schedule_activity_with_retry_on_session_typed<In, Out>(
&self,
name: impl Into<String>,
input: &In,
policy: RetryPolicy,
session_id: impl Into<String>,
) -> Result<Out, String>
// Usage:
let session = ctx.new_guid().await?;
let policy = RetryPolicy::new(3)
.with_backoff(BackoffStrategy::Fixed { delay: Duration::from_secs(1) });
let result = ctx.schedule_activity_with_retry_on_session(
"RunQuery", "SELECT 1", policy, &session,
).await?;
Scheduling Timers
// Delay as Duration
fn schedule_timer(&self, delay: Duration) -> impl Future<Output = ()>
// Usage:
ctx.schedule_timer(Duration::from_secs(5)).await; // Wait 5 seconds
ctx.schedule_timer(Duration::from_secs(60)).await; // Wait 1 minute
External Events (Ephemeral)
// Wait for external signal by name (one-shot, positional matching)
fn schedule_wait(&self, name: impl Into<String>) -> impl Future<Output = String>
// Usage:
let approval_data = ctx.schedule_wait("ApprovalEvent").await;
// Typed version
let approval: ApprovalData = ctx.schedule_wait_typed("ApprovalEvent").await;
Event Queues (Persistent/FIFO)
Durable FIFO queues that survive continue_as_new boundaries. Unlike ephemeral
events, queue messages are consumed in order and don't require a matching
subscription before the event is raised.
Important: Events must be enqueued after the orchestration has started. Events enqueued before
start_orchestrationis called may be dropped by the provider (orphan message semantics). Once the orchestration is running, queued events are buffered until consumed — even if nodequeue_eventcall is active.
// Dequeue next message from a named queue (blocks until available)
fn dequeue_event(&self, queue: impl Into<String>) -> impl Future<Output = String>
// Typed variant (auto-deserializes)
fn dequeue_event_typed<T: DeserializeOwned>(&self, queue: impl Into<String>) -> impl Future<Output = T>
// Usage:
let msg = ctx.dequeue_event("inbox").await;
// Typed:
let msg: ChatMessage = ctx.dequeue_event_typed("inbox").await;
Queue vs Ephemeral events:
| Feature | schedule_wait | dequeue_event |
|---|---|---|
| Semantics | Positional (Nth wait matches Nth raise) | FIFO queue |
| Survives CAN | No | Yes |
| Pre-subscription required | Yes | No |
| Pre-start messages | N/A | Dropped (enqueue only after orchestration starts) |
| Use case | One-shot signals, approvals | Chat messages, command streams |
Custom Status
Publish progress or structured state visible to external clients via polling.
// Set custom status (string payload, typically JSON)
fn set_custom_status(&self, status: impl Into<String>)
// Clear custom status back to None
fn reset_custom_status(&self)
// Read current custom status (reflects all set/reset calls across turns)
fn get_custom_status(&self) -> Option<String>
// Usage:
ctx.set_custom_status(&serde_json::to_string(&MyProgress { pct: 50 }).unwrap());
Custom status changes are recorded as CustomStatusUpdated history events, making them
durable and replayable. The status survives continue_as_new — the last value is
carried forward to the new execution. Clients read it via wait_for_status_change()
(see Client API).
KV Store (Durable Key-Value State)
Store per-instance key-value pairs that survive replay, continue_as_new, and are
visible to external clients. Use for progress tracking, configuration, or cross-instance
communication.
// Set a value (fire-and-forget, no await needed)
ctx.set_kv_value("progress", "step_3_of_5");
// Set with automatic JSON serialization
ctx.set_kv_value_typed("config", &MyConfig { retries: 3 });
// Read local KV state (replay-safe, returns current snapshot)
let val: Option<String> = ctx.get_kv_value("progress");
// Read with automatic deserialization
let config: Option<MyConfig> = ctx.get_kv_value_typed::<MyConfig>("config").unwrap();
// Read all KV entries as a snapshot
let all: HashMap<String, String> = ctx.get_kv_all_values();
let keys: Vec<String> = ctx.get_kv_all_keys();
let count: usize = ctx.get_kv_length();
// Read KV from another running instance (cross-instance, requires await)
let other_val = ctx.get_kv_value_from_instance("other-instance-id", "status").await?;
// Clear a single key
ctx.clear_kv_value("progress");
// Clear all keys for this instance
ctx.clear_all_kv_values();
// Prune old keys by timestamp (useful in long-running / ContinueAsNew orchestrations)
let removed = ctx.prune_kv_values_updated_before(cutoff_ms);
Behavior:
- KV operations (
set_kv_value,clear_kv_value,clear_all_kv_values) are fire-and-forget — they don't suspend the orchestration - Values survive
continue_as_new(carried to the new execution) - Values are visible to external clients via
client.get_kv_value(instance, key)orclient.get_kv_all_values(instance) - Local reads (
get_kv_value,get_kv_all_values) return the current in-memory snapshot (replay-safe) - Cross-instance reads (
get_kv_value_from_instance) are async and hit the provider prune_kv_values_updated_before(cutoff_ms)removes keys withlast_updated_at_ms < cutoff_ms, including keys set in the current turn.- Limits: Max 100 keys per instance (
MAX_KV_KEYS), max 16KB per value (MAX_KV_VALUE_BYTES) - Exceeding limits fails the orchestration turn
Sub-Orchestrations
// Start child orchestration
fn schedule_sub_orchestration(&self, name: impl Into<String>, input: impl Into<String>)
-> impl Future<Output = Result<String, String>>
// Usage:
let child_result = ctx.schedule_sub_orchestration("ChildWorkflow", input_json).await?;
// Typed version
let child_result: ChildOutput = ctx.schedule_sub_orchestration_typed("ChildWorkflow", &child_input).await?;
Detached Orchestrations (Fire-and-Forget)
// Start orchestration without waiting for result
fn schedule_orchestration(&self, name: impl Into<String>, instance: impl Into<String>, input: impl Into<String>)
// Usage:
ctx.schedule_orchestration("EmailNotification", "email-123", email_json);
// Returns immediately, doesn't wait for completion
Composition (Select/Join)
// Select: first to complete wins, returns Either2 or Either3
async fn select2<T1, T2, F1, F2>(&self, f1: F1, f2: F2) -> Either2<T1, T2>
async fn select3<T1, T2, T3, F1, F2, F3>(&self, f1: F1, f2: F2, f3: F3) -> Either3<T1, T2, T3>
// Join: wait for all
async fn join<T, F>(&self, futures: Vec<F>) -> Vec<T>
async fn join3<T1, T2, T3, F1, F2, F3>(&self, f1: F1, f2: F2, f3: F3) -> (T1, T2, T3)
// Usage:
let timer = ctx.schedule_timer(Duration::from_secs(30));
let approval = ctx.schedule_wait("Approval");
match ctx.select2(timer, approval).await {
Either2::First(()) => println!("Timed out"),
Either2::Second(data) => println!("Approved: {}", data),
}
// Join example
let f1 = ctx.schedule_activity("Task1", "a");
let f2 = ctx.schedule_activity("Task2", "b");
let f3 = ctx.schedule_activity("Task3", "c");
let results = ctx.join(vec![f1, f2, f3]).await; // Wait for all 3
Continue As New
// End current execution, start fresh with new input
// Returns a future that never resolves - use with `return` to terminate
fn continue_as_new(&self, input: impl Into<String>) -> impl Future<Output = Result<String, String>>
// Usage:
async fn pagination(ctx: OrchestrationContext, cursor: String) -> Result<String, String> {
let next_cursor = ctx.schedule_activity("ProcessPage", cursor).await?;
if next_cursor == "EOF" {
Ok("done".to_string())
} else {
return ctx.continue_as_new(next_cursor).await; // Terminates execution
}
}
Important: continue_as_new() returns a future that never resolves. You must await it and use return to ensure code after is unreachable. The compiler will warn about unreachable code if you forget return.
Note: Each Continue As New starts a new execution with an empty history. The runtime stamps a fresh OrchestrationStarted { event_id: 1 } for the new execution; activities, timers, and external events always target the currently active execution only.
Logging (Replay-Safe)
fn trace_info(&self, message: impl Into<String>)
fn trace_warn(&self, message: impl Into<String>)
fn trace_error(&self, message: impl Into<String>)
fn trace_debug(&self, message: impl Into<String>)
// Usage:
ctx.trace_info("Processing order 12345");
ctx.trace_warn(format!("Retry attempt {}", attempt));
ctx.trace_error("Payment failed");
Default log targets The runtime installs a filter equivalent to
error,duroxide::orchestration={level},duroxide::activity={level}(where{level}comes fromObservabilityConfig.log_level). This keeps stdout focused on orchestration and activity trace helpers. To opt into additional targets—such as the internal runtime dispatcher logs—overrideRUST_LOGbefore starting your app:RUST_LOG=error,duroxide::orchestration=info,duroxide::activity=info,duroxide::runtime=debug cargo run --example with_observabilityYou can add any other targets in the same way (for example
duroxide::providers::sqlite=trace). SettingRUST_LOGalways takes precedence over the runtime defaults.
System Calls (Deterministic Non-Determinism)
// Generate a random (UUID v4) GUID, recorded in history so it is replay-stable
async fn new_guid(&self) -> Result<String, String>
// Get deterministic UTC timestamp
async fn utc_now(&self) -> Result<std::time::SystemTime, String>
// Usage:
let correlation_id = ctx.new_guid().await?;
let created_at = ctx.utc_now().await?;
let created_at_ms = created_at
.duration_since(std::time::UNIX_EPOCH)
.map_err(|e| e.to_string())?
.as_millis() as u64;
Client API Reference
The Client provides control-plane operations for managing orchestration instances. It communicates with the runtime through the shared Provider (no direct coupling).
Creating a Client
use duroxide::Client;
use duroxide::providers::sqlite::SqliteProvider;
use std::sync::Arc;
let store = Arc::new(SqliteProvider::new("sqlite:./data.db", None).await?);
let client = Client::new(store);
Key Points:
- Client shares the same Provider instance as the Runtime
- Client is
Cloneand thread-safe - Can be used from any process, even without a running Runtime
Core Operations
Start an Orchestration
// Basic string input
client.start_orchestration("order-123", "ProcessOrder", r#"{"customer_id": "c1"}"#).await?;
// With explicit version
client.start_orchestration_versioned("order-456", "ProcessOrder", "2.0.0", "{}").await?;
// With typed input
#[derive(Serialize)]
struct OrderInput {
customer_id: String,
amount: f64,
}
let input = OrderInput {
customer_id: "c1".to_string(),
amount: 99.99,
};
client.start_orchestration_typed("order-789", "ProcessOrder", &input).await?;
Behavior:
- Enqueues a
StartOrchestrationwork item - Runtime picks it up and begins execution
- Returns immediately (non-blocking)
- Use
wait_for_orchestration()to wait for completion
Raise External Event
// String payload
client.raise_event("order-123", "PaymentReceived", r#"{"amount": 99.99}"#).await?;
// Typed payload
#[derive(Serialize)]
struct PaymentEvent {
transaction_id: String,
amount: f64,
}
let event = PaymentEvent {
transaction_id: "tx-456".to_string(),
amount: 99.99,
};
client.raise_event_typed("order-123", "PaymentReceived", &event).await?;
Use Cases:
- Human approvals
- Webhook callbacks
- External system notifications
- Timer-based triggers from external schedulers
Enqueue Event (Queue Semantics)
// Send a message into an orchestration's named queue
client.enqueue_event("chat-42", "inbox", r#"{"text": "Hello"}"#).await?;
// Typed variant (auto-serializes)
client.enqueue_event_typed("chat-42", "inbox", &ChatMessage { text: "Hello".into() }).await?;
Use Cases:
- Chat / conversational orchestrations
- Command streams to long-running orchestrations
- Any pattern where messages arrive before the orchestration subscribes
Poll Custom Status
let mut version = 0u64;
loop {
match client.wait_for_status_change("order-123", version, poll_interval, timeout).await? {
OrchestrationStatus::Running { custom_status: Some(s), custom_status_version: v, .. } => {
version = v;
let progress: MyProgress = serde_json::from_str(&s).unwrap();
println!("Progress: {}%", progress.pct);
}
OrchestrationStatus::Completed { .. } => break,
_ => break,
}
}
Tip: wait_for_status_change blocks until the status version advances past
last_seen_version, making it efficient for progress polling without tight loops.
Cancel an Orchestration
client.cancel_instance("order-123", "Customer requested cancellation").await?;
Behavior:
- Enqueues a
CancelInstancework item - Runtime processes it on next turn
- Orchestration moves to
Failedstatus with cancellation error - Cascading cancellation: Child sub-orchestrations are also cancelled
- Activity cancellation: In-flight activities receive cancellation signal (see Cooperative Activity Cancellation)
What happens to in-flight activities:
- The activity's
ActivityContextreceives the cancellation signal - Cooperative activities can check
ctx.is_cancelled()and clean up gracefully - The runtime waits up to
activity_cancellation_grace_period(default 30s) for the activity to finish - If the activity doesn't finish in time, it's forcibly aborted
- The activity result (success or failure) is dropped since the orchestration is already terminal
Example with graceful cleanup:
// Activity that responds to cancellation
activities.register("ProcessBatch", |ctx: ActivityContext, batch: String| async move {
let items: Vec<Item> = serde_json::from_str(&batch)?;
for (i, item) in items.iter().enumerate() {
// Check cancellation between items
if ctx.is_cancelled() {
ctx.trace_info(format!("Cancelled after processing {} of {} items", i, items.len()));
return Err("Batch processing cancelled".to_string());
}
process_item(item).await?;
}
Ok(format!("Processed {} items", items.len()))
});
Check Status
use duroxide::OrchestrationStatus;
let status = client.get_orchestration_status("order-123").await?; // Returns Result<OrchestrationStatus, ClientError>
match status {
OrchestrationStatus::Running { .. } => println!("Still processing..."),
OrchestrationStatus::Completed { output, .. } => println!("Done: {}", output),
OrchestrationStatus::Failed { details, .. } => {
eprintln!("Failed: {}", details.display_message());
eprintln!("Category: {}", details.category());
}
OrchestrationStatus::NotFound => println!("Instance doesn't exist"),
}
Returns:
Running- Orchestration in progressCompleted { output }- Successful completion with resultFailed { details }- Failed with error detailsNotFound- Instance doesn't exist or no history
Wait for Completion
use std::time::Duration;
// Wait up to 30 seconds
let status = client.wait_for_orchestration("order-123", Duration::from_secs(30)).await?;
match status {
OrchestrationStatus::Completed { output, .. } => {
println!("Success: {}", output);
}
OrchestrationStatus::Failed { details, .. } => {
eprintln!("Failed: {}", details.display_message());
}
_ => unreachable!("wait_for_orchestration only returns terminal states"),
}
// Typed output
#[derive(Deserialize)]
struct OrderResult {
order_id: String,
status: String,
}
let result: OrderResult = client
.wait_for_orchestration_typed("order-123", Duration::from_secs(30))
.await?;
Behavior:
- Polls status with exponential backoff (5ms → 100ms)
- Returns when terminal state reached or timeout
- Only returns
CompletedorFailed, neverRunningorNotFound
Read KV Values
Read durable key-value state from any orchestration instance, even while running.
// Read a single KV value
let val: Option<String> = client.get_kv_value("order-123", "status").await?;
// Read with automatic deserialization
let config: MyConfig = client.get_kv_value_typed::<MyConfig>("order-123", "config").await?
.expect("config should exist");
// Poll until a value appears (with timeout)
let val = client.wait_for_kv_value("order-123", "ready_flag", Duration::from_secs(10)).await?;
// Poll with typed deserialization
let result: MyResult = client
.wait_for_kv_value_typed::<MyResult>("order-123", "result", Duration::from_secs(30))
.await?
.expect("result should exist");
Behavior:
get_kv_valuereads directly from the provider (no history replay needed)wait_for_kv_valuepolls with exponential backoff until the key exists or timeout- Values survive
continue_as_newand are readable after orchestration completion - Returns
Nonefor nonexistent keys or instances
Management Operations (Optional)
These methods require a provider that implements ProviderAdmin. Check availability first:
if client.has_management_capability() {
// Management operations available
} else {
// Provider doesn't support management
}
List Instances
// List all instances
let instances = client.list_all_instances().await?;
for instance in instances {
println!("Instance: {}", instance);
}
// Filter by status
let completed = client.list_instances_by_status("Completed").await?;
let running = client.list_instances_by_status("Running").await?;
let failed = client.list_instances_by_status("Failed").await?;
Get Instance Metadata
let info = client.get_instance_info("order-123").await?;
println!("Instance: {}", info.instance_id);
println!("Orchestration: {} v{}", info.orchestration_name, info.orchestration_version);
println!("Status: {}", info.status);
println!("Current Execution: {}", info.current_execution_id);
if let Some(output) = info.output {
println!("Output: {}", output);
}
Inspect Executions
// List all execution IDs for an instance
let executions = client.list_executions("order-123").await?;
println!("Executions: {:?}", executions); // [1, 2, 3] if ContinueAsNew used
// Get execution metadata
let exec_info = client.get_execution_info("order-123", 1).await?;
println!("Status: {}", exec_info.status);
println!("Events: {}", exec_info.event_count);
// Read execution history
let history = client.read_execution_history("order-123", 1).await?;
for event in history {
println!("Event: {:?}", event);
}
System Metrics
let metrics = client.get_system_metrics().await?;
println!("Total instances: {}", metrics.total_instances);
println!("Running: {}", metrics.running_instances);
println!("Completed: {}", metrics.completed_instances);
println!("Failed: {}", metrics.failed_instances);
let queues = client.get_queue_depths().await?;
println!("Orchestrator queue: {}", queues.orchestrator_queue);
println!("Worker queue: {}", queues.worker_queue);
Orchestration Stats
Query per-instance runtime introspection stats computed on-demand from the provider:
use duroxide::Client;
let stats = client.get_orchestration_stats("my-instance").await?;
match stats {
Some(s) => {
println!("History events: {}", s.history_event_count);
println!("History size: {} bytes", s.history_size_bytes);
println!("KV keys: {}", s.kv_user_key_count);
println!("KV value bytes: {}", s.kv_total_value_bytes);
println!("Pending carry-forward: {}", s.queue_pending_count);
}
None => println!("Instance not found"),
}
The SystemStats struct contains:
| Field | Description |
|---|---|
history_event_count | Total events in history for the current execution |
history_size_bytes | Approximate serialized size of the full history |
queue_pending_count | Unprocessed messages carried forward from the previous execution |
kv_user_key_count | Number of KV keys for the instance |
kv_total_value_bytes | Sum of all KV value sizes |
Client Usage Patterns
Fire-and-Forget
// Start orchestration and don't wait
client.start_orchestration("background-job-1", "ProcessData", "input").await?;
// Continue with other work immediately
Synchronous Wait
// Start and wait for completion
client.start_orchestration("sync-job-1", "ProcessData", "input").await?;
let status = client.wait_for_orchestration("sync-job-1", Duration::from_secs(30)).await?;
Polling for Progress
client.start_orchestration("long-job-1", "ProcessData", "input").await?;
loop {
let status = client.get_orchestration_status("long-job-1").await?; // Returns Result<OrchestrationStatus, ClientError>
match status {
OrchestrationStatus::Running { .. } => {
println!("Still running...");
tokio::time::sleep(Duration::from_secs(5)).await;
}
OrchestrationStatus::Completed { output, .. } => {
println!("Completed: {}", output);
break;
}
OrchestrationStatus::Failed { details, .. } => {
eprintln!("Failed: {}", details.display_message());
break;
}
_ => break,
}
}
Human-in-the-Loop
// In your orchestration:
async fn approval_workflow(ctx: OrchestrationContext, order_id: String) -> Result<String, String> {
let result = ctx.schedule_activity("SubmitForApproval", order_id).await?;
// Wait for approval event
let approval = ctx.schedule_wait("ManagerApproval").await;
Ok(approval)
}
// External system receives approval and raises event:
async fn handle_approval(client: &Client, order_id: &str, approved: bool) {
let event_data = serde_json::json!({
"approved": approved,
"approver": "manager@company.com",
"timestamp": chrono::Utc::now().to_rfc3339(),
});
client.raise_event(
&format!("order-{}", order_id),
"ManagerApproval",
event_data.to_string()
).await.unwrap();
}
Dashboard / Monitoring
async fn show_dashboard(client: &Client) {
if !client.has_management_capability() {
println!("Management features not available");
return;
}
// System overview
let metrics = client.get_system_metrics().await.unwrap();
println!("=== System Status ===");
println!("Total: {} | Running: {} | Completed: {} | Failed: {}",
metrics.total_instances,
metrics.running_instances,
metrics.completed_instances,
metrics.failed_instances
);
// Queue health
let queues = client.get_queue_depths().await.unwrap();
println!("\n=== Queue Depths ===");
println!("Orchestrator: {} | Worker: {}",
queues.orchestrator_queue,
queues.worker_queue
);
// Recent failures
let failed = client.list_instances_by_status("Failed").await.unwrap();
println!("\n=== Recent Failures ===");
for instance in failed.iter().take(10) {
let info = client.get_instance_info(instance).await.unwrap();
println!("{}: {} ({})", instance, info.orchestration_name, info.status);
}
}
Error Handling
use duroxide::OrchestrationStatus;
match client.start_orchestration("test", "MyOrch", "input").await {
Ok(()) => println!("Started successfully"),
Err(e) => eprintln!("Failed to start: {}", e),
}
// Wait with timeout handling
match client.wait_for_orchestration("test", Duration::from_secs(10)).await {
Ok(OrchestrationStatus::Completed { output, .. }) => println!("Done: {}", output),
Ok(OrchestrationStatus::Failed { details, .. }) => {
eprintln!("Orchestration failed: {}", details.display_message());
// Check error category for handling
match details.category() {
"application" => {
// Business logic error - may want to retry with different input
}
"configuration" => {
// Nondeterminism detected - requires code fix
}
"infrastructure" => {
// Provider/database error - may be transient
// Note: Poison errors (unregistered handlers after backoff) also fall here
}
_ => {}
}
}
Err(e) => eprintln!("Timeout or error: {:?}", e),
_ => {}
}
Client API Summary
| Method | Purpose | Returns |
|---|---|---|
start_orchestration() | Start new instance | Result<(), ClientError> |
raise_event() | Send ephemeral external event | Result<(), ClientError> |
enqueue_event() | Send queue message (FIFO) | Result<(), ClientError> |
cancel_instance() | Request cancellation | Result<(), ClientError> |
get_orchestration_status() | Check status | Result<OrchestrationStatus, ClientError> |
wait_for_orchestration() | Wait for completion | Result<OrchestrationStatus, ClientError> |
wait_for_status_change() | Poll custom status | Result<OrchestrationStatus, ClientError> |
get_kv_value() | Read KV value | Result<Option<String>, ClientError> |
get_kv_all_values() | Read all KV values | Result<HashMap<String, String>, ClientError> |
get_kv_value_typed() | Read KV value (deserialized) | Result<Option<T>, ClientError> |
wait_for_kv_value() | Poll for KV value | Result<String, ClientError> |
wait_for_kv_value_typed() | Poll for KV value (deserialized) | Result<T, ClientError> |
has_management_capability() | Check feature availability | bool |
list_all_instances() | List instances | Result<Vec<String>, ClientError> |
list_instances_by_status() | Filter by status | Result<Vec<String>, ClientError> |
get_instance_info() | Instance metadata | Result<InstanceInfo, ClientError> |
get_execution_info() | Execution metadata | Result<ExecutionInfo, ClientError> |
list_executions() | List execution IDs | Result<Vec<u64>, ClientError> |
read_execution_history() | Read history | Result<Vec<Event>, ClientError> |
get_system_metrics() | System stats | Result<SystemMetrics, ClientError> |
get_queue_depths() | Queue depths | Result<QueueDepths, ClientError> |
get_orchestration_stats() | Per-instance stats | Result<Option<SystemStats>, ClientError> |
Common Patterns
Session Affinity Pattern
Use schedule_activity_on_session when you need multiple activities to run on the
same worker process — typically to share in-memory state like caches, connection pools,
or loaded ML models.
async fn copilot_conversation(ctx: OrchestrationContext, config: String) -> Result<String, String> {
// Generate a deterministic session ID (replay-safe)
let session_id = ctx.new_guid().await?;
// First turn: hydrates session state on the claiming worker
ctx.schedule_activity_on_session("HydrateSession", &config, &session_id).await?;
// Subsequent turns: routed to the same worker, reusing in-memory state
loop {
let user_input = ctx.schedule_wait("user_message").await;
let response = ctx.schedule_activity_on_session(
"RunTurn", &user_input, &session_id
).await?;
if response == "done" {
break;
}
}
Ok("conversation complete".to_string())
}
Worker side — managing session state:
// Use an LRU cache keyed by session_id
static SESSION_CACHE: LazyLock<Mutex<LruCache<String, SessionState>>> = ...;
.register("RunTurn", |ctx: ActivityContext, input: String| async move {
let session_id = ctx.session_id().expect("must be session-bound");
let mut cache = SESSION_CACHE.lock().unwrap();
let state = cache.get_mut(session_id)
.ok_or("unknown_session — needs re-hydration")?;
let result = state.process(&input).await;
Ok(result)
})
Session migration handling: When a session migrates (worker death, idle timeout), the new worker won't have the in-memory state. Handle this reactively:
match ctx.schedule_activity_on_session("RunTurn", &input, &session_id).await {
Ok(result) => { /* success */ }
Err(e) if e.contains("unknown_session") => {
// Re-hydrate on the new worker
ctx.schedule_activity_on_session("HydrateSession", &config, &session_id).await?;
ctx.schedule_activity_on_session("RunTurn", &input, &session_id).await?;
}
Err(e) => return Err(e),
}
Rolling upgrade caveat: Session-bound activities are pinned to the owning worker until the session unpins. During rolling upgrades, ensure all workers have the session activity handlers registered before orchestrations schedule session activities.
1. Function Chaining (Sequential Steps)
async fn process_order(ctx: OrchestrationContext, order_json: String) -> Result<String, String> {
ctx.trace_info("Starting order processing");
// Step 1: Validate
let validated = ctx.schedule_activity("ValidateOrder", order_json)
.await?;
// Step 2: Charge payment
let payment_result = ctx.schedule_activity("ChargePayment", validated)
.await?;
// Step 3: Ship order
let tracking = ctx.schedule_activity("ShipOrder", payment_result)
.await?;
// Step 4: Send confirmation
let confirmation = ctx.schedule_activity("SendConfirmation", tracking)
.await?;
ctx.trace_info("Order processing completed");
Ok(confirmation)
}
2. Fan-Out/Fan-In (Parallel Processing)
async fn process_users(ctx: OrchestrationContext, user_ids_json: String) -> Result<String, String> {
// Parse input
let user_ids: Vec<String> = serde_json::from_str(&user_ids_json)?;
// Fan-out: Schedule all tasks in parallel
let futures: Vec<_> = user_ids
.into_iter()
.map(|user_id| ctx.schedule_activity("ProcessUser", user_id))
.collect();
// Fan-in: Wait for all to complete (deterministic order)
let results = ctx.join(futures).await;
// Process results - each result is Result<String, String>
let mut successes = 0;
for result in results {
match result {
Ok(_) => successes += 1,
Err(e) => {
ctx.trace_warn(format!("User processing failed: {}", e));
}
}
}
Ok(format!("Processed {} users successfully", successes))
}
3. Human-in-the-Loop with Timeout
async fn approval_workflow(ctx: OrchestrationContext, request_json: String) -> Result<String, String> {
// Submit approval request
let request_id = ctx.schedule_activity("SubmitApprovalRequest", request_json)
.await?;
// Wait for approval or timeout
let timeout = ctx.schedule_timer(std::time::Duration::from_millis(86_400_000)); // 24 hours
let approval = ctx.schedule_wait("ApprovalEvent");
match ctx.select2(timeout, approval).await {
Either2::First(()) => {
// Timed out - escalate
ctx.trace_warn("Approval timed out, escalating");
ctx.schedule_activity("EscalateApproval", request_id)
.await
}
Either2::Second(approval_data) => {
// Approved - use the data
ctx.trace_info("Approval received");
Ok(approval_data)
}
}
}
4. Error Handling and Compensation (Saga Pattern)
async fn saga_orchestration(ctx: OrchestrationContext, order_json: String) -> Result<String, String> {
// Step 1: Reserve inventory
let reservation = match ctx.schedule_activity("ReserveInventory", order_json.clone())
.await
{
Ok(r) => r,
Err(e) => {
ctx.trace_error(format!("Reservation failed: {}", e));
return Err("Reservation failed".to_string());
}
};
// Step 2: Charge payment
match ctx.schedule_activity("ChargePayment", reservation.clone())
.await
{
Ok(payment) => Ok(payment),
Err(e) => {
// Compensation: Release inventory
ctx.trace_warn(format!("Payment failed: {}, releasing inventory", e));
ctx.schedule_activity("ReleaseInventory", reservation)
.await?;
Err("Payment failed, order cancelled".to_string())
}
}
}
5. Retry with Backoff
Preferred approach - Use the built-in schedule_activity_with_retry():
use duroxide::{RetryPolicy, BackoffStrategy};
async fn retry_orchestration(ctx: OrchestrationContext, task_input: String) -> Result<String, String> {
// Simple: 3 attempts with default exponential backoff
let result = ctx.schedule_activity_with_retry(
"UnreliableTask",
task_input,
RetryPolicy::new(3),
).await?;
Ok(result)
}
// Custom policy with timeout and fixed backoff
async fn custom_retry(ctx: OrchestrationContext, input: String) -> Result<String, String> {
let policy = RetryPolicy::new(5)
.with_timeout(std::time::Duration::from_secs(30)) // Per-attempt timeout
.with_backoff(BackoffStrategy::Exponential {
base: std::time::Duration::from_millis(100),
multiplier: 2.0,
max: std::time::Duration::from_secs(10),
});
ctx.schedule_activity_with_retry("Task", input, policy).await
}
Backoff strategies available:
BackoffStrategy::None- No delay between retriesBackoffStrategy::Fixed { delay }- Same delay every retryBackoffStrategy::Linear { base, max }- delay = base × attempt- `BackoffStrategy::Exponential { base, multiplier, max }$ - \text{delay} = \text{base} \times \text{multiplier}^(\text{attempt}-1)
\text{Behavior} \text{notes}:
- \text{Activity} \text{errors} \text{trigger} \text{retries} (\text{with} \text{backoff})
- \text{Timeouts} \text{exit} \text{immediately} (\text{no} \text{retry})
- \text{A} \text{warning} \text{trace} \text{is} \text{logged} \text{on} \text{each} \text{retry} \text{attempt}
\text{Manual} \text{retry} \text{loop} (\text{for} \text{advanced} \text{customization}):
$``rust async fn manual_retry(ctx: OrchestrationContext, task_input: String) -> Result<String, String> { let max_attempts = 5; let mut delay = std::time::Duration::from_secs(1);
for attempt in 1..=max_attempts {
ctx.trace_info(format!("Attempt {} of {}", attempt, max_attempts));
match ctx.schedule_activity("UnreliableTask", task_input.clone())
.await
{
Ok(result) => return Ok(result),
Err(e) => {
ctx.trace_warn(format!("Attempt {} failed: {}", attempt, e));
if attempt < max_attempts {
ctx.schedule_timer(delay).await;
delay *= 2;
} else {
return Err(format!("Failed after {} attempts", max_attempts));
}
}
}
}
unreachable!()
}
### 6. Understanding Error Types
Duroxide classifies errors into three categories for proper handling and metrics:
**Application Errors** - Business logic failures that your orchestration code sees and handles:
```rust
async fn order_workflow(ctx: OrchestrationContext, input: String) -> Result<String, String> {
match ctx.schedule_activity("ProcessPayment", input).await {
Ok(txn_id) => Ok(format!("paid:{txn_id}")),
Err(e) => {
// This is an Application error - you handle it
// Examples: "insufficient_funds", "invalid_card"
ctx.trace_warn(format!("Payment failed: {e}"));
Err(format!("payment_error:{e}"))
}
}
}
Configuration Errors - Deployment issues that abort orchestration execution before your code runs:
- Nondeterminism (code changed between replays) - Immediate failure
- Unregistered orchestrations/activities - Uses exponential backoff, eventually fails as Poison
For unregistered handlers, the runtime automatically backs off to support rolling deployments:
- Message is abandoned with exponential backoff (1s → 2s → 4s → ... up to 60s)
- Eventually picked up by a node with the handler registered
- If all retry attempts exhaust, fails as
ErrorDetails::Poison
Your orchestration code never sees these errors. The runtime:
- Records the failure in history (ActivityFailed + OrchestrationFailed events)
- Marks the orchestration as failed with ErrorDetails::Configuration (nondeterminism) or ErrorDetails::Poison (unregistered)
- Requires deployment fix (register missing handler, fix nondeterministic code)
Infrastructure Errors - Provider/system failures that abort execution:
- Database connection failures
- Data corruption
- Queue operation failures
These also abort before your code runs, may be retryable by the runtime.
Checking Error Category (for metrics/logging):
match client.get_orchestration_status("inst-1").await? { // Returns Result<OrchestrationStatus, ClientError>
OrchestrationStatus::Failed { details, .. } => {
match details.category() {
"infrastructure" => alert_ops_team(), // System issue
"configuration" => alert_dev_team(), // Deployment issue
"application" => log_business_error(), // Expected failures
}
}
_ => {}
}
7. Loop and Accumulate
async fn accumulate_results(ctx: OrchestrationContext, items_json: String) -> Result<String, String> {
let items: Vec<String> = serde_json::from_str(&items_json)?;
let mut accumulator = Vec::new();
for (i, item) in items.iter().enumerate() {
ctx.trace_info(format!("Processing item {} of {}", i + 1, items.len()));
let result = ctx.schedule_activity("ProcessItem", item.clone())
.await?;
accumulator.push(result);
}
let summary = serde_json::to_string(&accumulator).unwrap();
Ok(summary)
}
7. Conditional Workflow
async fn conditional_workflow(ctx: OrchestrationContext, order_json: String) -> Result<String, String> {
// Get order value
let order_value: f64 = ctx.schedule_activity("GetOrderValue", order_json.clone())
.await?;
if order_value > 1000.0 {
// High value - require approval
ctx.trace_info("High-value order, requiring approval");
let approval_request = ctx.schedule_activity("CreateApprovalRequest", order_json.clone())
.await?;
let timeout = ctx.schedule_timer(std::time::Duration::from_millis(3600_000)); // 1 hour
let approval = ctx.schedule_wait("ManagerApproval");
let (winner, _) = ctx.select2(timeout, approval).await;
if winner == 0 {
ctx.trace_warn("Approval timeout");
return Err("Approval timeout".to_string());
}
}
// Process the order
ctx.schedule_activity("ProcessOrder", order_json).await
}
8. Sub-Orchestrations (Composition)
async fn parent_workflow(ctx: OrchestrationContext, batch_json: String) -> Result<String, String> {
let batches: Vec<String> = serde_json::from_str(&batch_json)?;
// Process each batch as a sub-orchestration
let mut results = Vec::new();
for batch in batches {
let result = ctx.schedule_sub_orchestration("ProcessBatch", batch)
.await?;
results.push(result);
}
Ok(serde_json::to_string(&results).unwrap())
}
async fn child_workflow(ctx: OrchestrationContext, batch: String) -> Result<String, String> {
// Child has its own event history, isolated from parent
let processed = ctx.schedule_activity("ProcessBatchItems", batch)
.await?;
ctx.trace_info("Batch processing completed");
Ok(processed)
}
9. Continue As New (Long-Running Workflows)
async fn eternal_monitor(ctx: OrchestrationContext, state_json: String) -> Result<(), String> {
let state: MonitorState = serde_json::from_str(&state_json)?;
// Do one iteration of work
let check_result = ctx.schedule_activity("CheckSystem", state_json)
.await?;
// Wait before next check
ctx.schedule_timer(std::time::Duration::from_millis(60_000)).await; // 1 minute
// Continue with updated state - await and return to terminate this execution
return ctx.continue_as_new(check_result).await;
}
Anti-Patterns (What to Avoid)
❌ Anti-Pattern 1: Business Logic / Control Flow in Activities
// WRONG - Activity contains business logic and branching
activities.register("ProcessOrderComplex", |ctx: ActivityContext, order_json: String| async move {
let order: Order = serde_json::from_str(&order_json)?;
// ❌ Business logic in activity
if order.total > 1000.0 {
// Send for approval
send_approval_request(&order).await?;
wait_for_approval(&order.id).await?; // Polling/waiting in activity
}
// ❌ Control flow in activity
if order.customer_type == "premium" {
process_premium(&order).await?;
} else {
process_standard(&order).await?;
}
Ok("processed".to_string())
});
// CORRECT - Business logic in orchestration, activities are single-purpose
// Activities: single-purpose, can sleep/poll internally if needed
activities.register("GetOrderValue", |ctx: ActivityContext, order_json: String| async move {
let order: Order = serde_json::from_str(&order_json)?;
Ok(order.total.to_string())
});
activities.register("SendApprovalRequest", |ctx: ActivityContext, order_json: String| async move {
let order: Order = serde_json::from_str(&order_json)?;
send_to_approval_system(&order).await?;
Ok(order.id)
});
activities.register("ProcessPremiumOrder", |ctx: ActivityContext, order_json: String| async move {
let order: Order = serde_json::from_str(&order_json)?;
// Activity can poll/sleep internally - that's fine!
let vm = provision_vm(&order).await?;
while !vm_ready(&vm.id).await {
tokio::time::sleep(Duration::from_secs(5)).await; // ✅ OK in activity
}
process_on_vm(&order, &vm).await?;
Ok("processed".to_string())
});
// Orchestration: contains business logic and control flow
async fn good_orch(ctx: OrchestrationContext, order_json: String) -> Result<String, String> {
// Get order value
let value_str = ctx.schedule_activity("GetOrderValue", order_json.clone())
.await?;
let value: f64 = value_str.parse().unwrap();
// ✅ Business logic in orchestration
if value > 1000.0 {
// Send for approval
ctx.schedule_activity("SendApprovalRequest", order_json.clone())
.await?;
// ✅ Orchestration-level timeout control
let timeout = ctx.schedule_timer(std::time::Duration::from_millis(3600_000)); // 1 hour
let approval = ctx.schedule_wait("ApprovalEvent");
let (winner, _) = ctx.select2(timeout, approval).await;
if winner == 0 {
return Err("Approval timeout".to_string());
}
}
// ✅ Control flow in orchestration
let customer_type = ctx.schedule_activity("GetCustomerType", order_json.clone())
.await?;
if customer_type == "premium" {
ctx.schedule_activity("ProcessPremiumOrder", order_json).await
} else {
ctx.schedule_activity("ProcessStandardOrder", order_json).await
}
}
Why:
- Activities are execution units - they do ONE thing well
- Orchestrations are coordination units - they contain business logic and control flow
- Control flow in activities breaks composability and replay benefits
- An activity CAN sleep/poll internally (e.g., provisioning a VM) - that's perfectly fine!
- What matters: keep activities single-purpose, pull multi-step logic into orchestrations
❌ Anti-Pattern 2: Non-Deterministic Branching
// WRONG - Random branching
async fn bad_orch(ctx: OrchestrationContext) -> Result<String, String> {
if rand::random::<bool>() { // Different on each replay!
ctx.schedule_activity("PathA", "").await
} else {
ctx.schedule_activity("PathB", "").await
}
}
// CORRECT - Deterministic branching based on activity result
async fn good_orch(ctx: OrchestrationContext) -> Result<String, String> {
let decision = ctx.schedule_activity("MakeDecision", "").await?;
if decision == "A" {
ctx.schedule_activity("PathA", "").await
} else {
ctx.schedule_activity("PathB", "").await
}
}
❌ Anti-Pattern 3: Using tokio::select! or tokio::join!
let result = ctx.schedule_activity("Task", "input").await?;
Ok(result)
}
### ❌ Anti-Pattern 4: Using Activity for Pure Delays/Timeouts
```rust
// WRONG - Activity that only sleeps (no business logic)
activities.register("Wait30Seconds", |ctx: ActivityContext, _: String| async move {
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
Ok("done".to_string())
});
async fn bad_delay(ctx: OrchestrationContext) -> Result<String, String> {
ctx.schedule_activity("Wait30Seconds", "").await?;
// Just wasted a worker slot for 30 seconds!
Ok("done".to_string())
}
// CORRECT - Use timer for pure delays
async fn good_delay(ctx: OrchestrationContext) -> Result<String, String> {
ctx.schedule_timer(std::time::Duration::from_millis(30_000)).await;
// Timer doesn't block workers, handled by timer dispatcher
Ok("done".to_string())
}
Why:
- If the ONLY purpose is to delay, use
schedule_timer()- it's designed for that - Activities are for doing work (DB, API, computation), not just waiting
- BUT: It's perfectly fine for an activity to sleep/poll AS PART of doing work:
// ✅ FINE - Activity polls as part of provisioning work activities.register("ProvisionVM", |ctx: ActivityContext, config: String| async move { let vm_id = start_vm_creation(config).await?; // Polling is part of the provisioning logic while !vm_ready(&vm_id).await { tokio::time::sleep(Duration::from_secs(5)).await; // ✅ OK - part of work } Ok(vm_id) }); - The rule: Use timers for orchestration-level delays, not for activity-internal polling
❌ Anti-Pattern 5: Infinite Loop Without Delay
// WRONG - Infinite loop hammering activities
async fn bad_poll(ctx: OrchestrationContext, task_id: String) -> Result<String, String> {
loop {
let status = ctx.schedule_activity("CheckStatus", task_id.clone())
.await?;
if status == "complete" {
return Ok(status);
}
// No delay - creates unlimited activities!
}
}
// CORRECT - Loop with timer delays
async fn good_poll(ctx: OrchestrationContext, task_id: String) -> Result<String, String> {
for _ in 0..100 { // Max iterations to prevent infinite history
let status = ctx.schedule_activity("CheckStatus", task_id.clone())
.await?;
if status == "complete" {
return Ok(status);
}
ctx.schedule_timer(std::time::Duration::from_millis(5000)).await; // Wait 5s before retry
}
Err("Polling timeout".to_string())
}
Better: Use Continue As New for eternal polling:
async fn eternal_poll(ctx: OrchestrationContext, task_id: String) -> Result<String, String> {
let status = ctx.schedule_activity("CheckStatus", task_id.clone())
.await?;
if status == "complete" {
Ok(status)
} else {
ctx.schedule_timer(std::time::Duration::from_millis(5000)).await;
return ctx.continue_as_new(task_id).await; // Fresh history for next iteration
}
}
❌ Anti-Pattern 6: Non-Deterministic Data Structures
use std::collections::HashMap;
// WRONG - HashMap iteration order is not guaranteed
async fn bad_graph(ctx: OrchestrationContext, _input: String) -> Result<String, String> {
let mut nodes: HashMap<String, Node> = HashMap::new();
nodes.insert("a".to_string(), Node { ... });
nodes.insert("b".to_string(), Node { ... });
nodes.insert("c".to_string(), Node { ... });
// Serializing to JSON - order varies between runs!
let graph_json = serde_json::to_string(&nodes)?;
// This activity may receive different JSON on replay,
// causing nondeterminism detection to fail
ctx.schedule_activity("ProcessGraph", graph_json)
.await
}
use std::collections::BTreeMap;
// CORRECT - BTreeMap maintains consistent ordering
async fn good_graph(ctx: OrchestrationContext, _input: String) -> Result<String, String> {
let mut nodes: BTreeMap<String, Node> = BTreeMap::new();
nodes.insert("a".to_string(), Node { ... });
nodes.insert("b".to_string(), Node { ... });
nodes.insert("c".to_string(), Node { ... });
// BTreeMap serializes in consistent key order
let graph_json = serde_json::to_string(&nodes)?;
ctx.schedule_activity("ProcessGraph", graph_json)
.await
}
Data structures to AVOID in orchestration code:
| Avoid | Use Instead | Why |
|---|---|---|
HashMap<K, V> | BTreeMap<K, V> | HashMap iteration order varies between runs |
HashSet<T> | BTreeSet<T> | HashSet iteration order is non-deterministic |
IndexMap with into_iter() after remove | BTreeMap | IndexMap preserves insertion order but remove+insert can change it |
dashmap, flurry, etc. | BTreeMap | Concurrent maps have non-deterministic iteration |
Why this matters:
When an orchestration replays, it must make the same decisions as the original run. If you serialize a HashMap to pass to an activity, the JSON key order can differ between the original run and replay. The runtime detects this as a nondeterminism violation because the activity input changed.
Safe patterns:
- Use
BTreeMap/BTreeSetwhen order matters for serialization Vecis fine—maintains insertion orderIndexMapis safe IF you only insert (never remove then re-insert)- Build ordered structures in activities if you need hash-based performance
Complete Examples
E-Commerce Order Processing
use duroxide::OrchestrationContext;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
struct Order {
order_id: String,
customer_id: String,
items: Vec<OrderItem>,
total: f64,
}
#[derive(Serialize, Deserialize)]
struct OrderItem {
product_id: String,
quantity: u32,
}
async fn process_order_orchestration(
ctx: OrchestrationContext,
order_json: String,
) -> Result<String, String> {
let order: Order = serde_json::from_str(&order_json)
.map_err(|e| format!("Invalid order JSON: {}", e))?;
ctx.trace_info(format!("Processing order {}", order.order_id));
// 1. Validate order
let validation_result = ctx.schedule_activity("ValidateOrder", order_json.clone())
.await?;
// 2. Reserve inventory for all items (fan-out)
let reservation_futures: Vec<_> = order.items.iter()
.map(|item| {
let item_json = serde_json::to_string(item).unwrap();
ctx.schedule_activity("ReserveInventory", item_json)
})
.collect();
let reservation_results = ctx.join(reservation_futures).await;
// Check all reservations succeeded - each result is Result<String, String>
let all_reserved = reservation_results.iter().all(|r| r.is_ok());
if !all_reserved {
ctx.trace_error("Inventory reservation failed");
return Err("Insufficient inventory".to_string());
}
// 3. Charge payment
let payment_result = ctx.schedule_activity("ChargePayment", order_json.clone())
.await;
match payment_result {
Ok(payment_id) => {
// 4. Fulfill order
ctx.trace_info("Payment successful, fulfilling order");
let fulfillment = ctx.schedule_activity("FulfillOrder", order_json)
.await?;
Ok(fulfillment)
}
Err(payment_error) => {
// Compensation: Release all reservations
ctx.trace_warn("Payment failed, compensating");
let release_futures: Vec<_> = order.items.iter()
.map(|item| {
let item_json = serde_json::to_string(item).unwrap();
ctx.schedule_activity("ReleaseInventory", item_json)
})
.collect();
ctx.join(release_futures).await;
Err(format!("Payment failed: {}", payment_error))
}
}
}
Document Approval Workflow
async fn document_approval(
ctx: OrchestrationContext,
document_id: String,
) -> Result<String, String> {
ctx.trace_info(format!("Starting approval for document {}", document_id));
// 1. Request manager approval
let approval_request = ctx.schedule_activity(
"SendApprovalRequest",
format!("{{\"document_id\": \"{}\", \"level\": \"manager\"}}", document_id)
).await?;
// 2. Wait for approval or timeout (24 hours)
let manager_timeout = ctx.schedule_timer(std::time::Duration::from_millis(86_400_000));
let manager_approval = ctx.schedule_wait(format!("ManagerApproval_{}", document_id));
let approval_data = match ctx.select2(manager_timeout, manager_approval).await {
Either2::First(()) => {
// Manager didn't respond - auto-escalate to director
ctx.trace_warn("Manager approval timeout, escalating to director");
ctx.schedule_activity(
"SendApprovalRequest",
format!("{{\"document_id\": \"{}\", \"level\": \"director\"}}", document_id)
).await?;
// Wait for director (no timeout)
ctx.schedule_wait(format!("DirectorApproval_{}", document_id))
.await
}
Either2::Second(data) => {
// Manager approved
ctx.trace_info("Manager approved document");
data
}
};
// 3. Finalize document
let finalize_input = format!("{{\"document_id\": \"{}\", \"approval\": {}}}",
document_id, approval_data);
ctx.schedule_activity("FinalizeDocument", finalize_input)
.await
}
Batch Processing with Continue As New
async fn batch_processor(
ctx: OrchestrationContext,
cursor: String,
) -> Result<String, String> {
ctx.trace_info(format!("Processing batch starting at cursor: {}", cursor));
// Fetch next batch
let batch_result = ctx.schedule_activity("FetchBatch", cursor)
.await?;
let batch: BatchResult = serde_json::from_str(&batch_result)?;
// Process items in this batch (fan-out)
let process_futures: Vec<_> = batch.items.iter()
.map(|item| ctx.schedule_activity("ProcessItem", item.clone()))
.collect();
let results = ctx.join(process_futures).await;
// Count successes - each result is Result<String, String>
let success_count = results.iter().filter(|r| r.is_ok()).count();
ctx.trace_info(format!("Processed {} of {} items", success_count, batch.items.len()));
// Continue to next batch or complete
if let Some(next_cursor) = batch.next_cursor {
ctx.trace_info("More batches to process, continuing");
return ctx.continue_as_new(next_cursor).await; // Start fresh with new cursor
} else {
ctx.trace_info("All batches processed");
Ok(format!("Batch processing complete"))
}
}
#[derive(Deserialize)]
struct BatchResult {
items: Vec<String>,
next_cursor: Option<String>,
}
Testing Your Orchestrations
Unit Testing with Replay Engine
For testing replay behavior, action emission, and completion delivery without the full runtime,
see the replay engine test suite in tests/replay_engine/. These tests use a minimal harness
that verifies orchestration logic against synthetic history.
Example pattern from tests/replay_engine/sequential_progress.rs:
/// Tests a simple activity schedule and completion.
///
/// Orchestration code:
/// ```ignore
/// async fn invoke(&self, ctx: OrchestrationContext, _input: String) -> Result<String, String> {
/// let result = ctx.schedule_activity("Task", "input").await?;
/// Ok(result)
/// }
/// ```
#[test]
fn activity_scheduled_and_completed() {
let history = vec![
started_event(1), // OrchestrationStarted
activity_scheduled(2, "Task", "input"), // schedule_activity()
activity_completed(3, 2, "result"), // activity done
];
let handler = SingleActivityHandler::new("Task", "input");
let result = run_replay(history, handler);
assert_eq!(result.status, TurnStatus::Completed(Ok("result".to_string())));
}
For comprehensive replay tests, run:
cargo nt -E 'test(/replay_engine::/)' # 113 tests covering replay behavior
Integration Testing (With Runtime)
#[tokio::test]
async fn test_full_workflow() {
let store = Arc::new(SqliteProvider::new_in_memory().await.unwrap());
let activities = ActivityRegistry::builder()
.register("Task", |ctx: ActivityContext, input: String| async move {
Ok(format!("processed: {}", input))
})
.build();
let orch = |ctx: OrchestrationContext, input: String| async move {
ctx.schedule_activity("Task", input).await
};
let orchestrations = OrchestrationRegistry::builder()
.register("TestOrch", orch)
.build();
let rt = runtime::Runtime::start_with_store(
store.clone(),
activities,
orchestrations,
).await;
let client = Client::new(store);
client.start_orchestration("test-1", "TestOrch", "input").await.unwrap();
let status = client.wait_for_orchestration("test-1", std::time::Duration::from_secs(5))
.await
.unwrap();
match status {
OrchestrationStatus::Completed { output, .. } => {
assert_eq!(output, "processed: input");
}
_ => panic!("Expected completion"),
}
rt.shutdown(None).await;
}
Best Practices
1. Keep Activities Idempotent
// Good: Idempotent database update
activities.register("UpdateUserStatus", |ctx: ActivityContext, user_json: String| async move {
let user: User = serde_json::from_str(&user_json)?;
// Upsert is idempotent
db.execute("INSERT INTO users (id, status) VALUES (?, ?) ON CONFLICT (id) DO UPDATE SET status = ?",
[&user.id, &user.status, &user.status])?;
Ok(user.id)
});
2. Use Correlation IDs
async fn order_with_tracking(ctx: OrchestrationContext, order_json: String) -> Result<String, String> {
// Generate correlation ID for entire workflow
let correlation_id = ctx.new_guid().await?;
// Pass correlation ID to all activities
let validated = ctx.schedule_activity(
"ValidateOrder",
format!("{{\"order\": {}, \"correlation_id\": \"{}\"}}", order_json, correlation_id)
).await?;
// Activities can log with correlation ID for tracing
// ...
Ok(correlation_id)
}
3. Handle Partial Failures
async fn robust_batch_processing(ctx: OrchestrationContext, items_json: String) -> Result<String, String> {
let items: Vec<String> = serde_json::from_str(&items_json)?;
// Process all items (don't stop on first failure)
let futures: Vec<_> = items.iter()
.map(|item| ctx.schedule_activity("ProcessItem", item.clone()))
.collect();
let results = ctx.join(futures).await;
// Collect successes and failures - each result is Result<String, String>
let mut successes = Vec::new();
let mut failures = Vec::new();
for (i, result) in results.iter().enumerate() {
match result {
Ok(output) => {
successes.push((i, output.clone()));
}
Err(error) => {
failures.push((i, error.clone()));
ctx.trace_warn(format!("Item {} failed: {}", i, error));
}
}
}
if failures.is_empty() {
Ok(format!("All {} items succeeded", successes.len()))
} else {
Ok(format!("{} succeeded, {} failed", successes.len(), failures.len()))
}
}
4. Use Typed APIs When Possible
#[derive(Serialize, Deserialize)]
struct OrderInput {
customer_id: String,
items: Vec<String>,
}
#[derive(Serialize, Deserialize)]
struct OrderOutput {
order_id: String,
status: String,
}
async fn typed_workflow(
ctx: OrchestrationContext,
order: OrderInput,
) -> Result<OrderOutput, String> {
// Compile-time type safety
let validation: ValidationResult = ctx
.schedule_activity_typed("ValidateOrder", &order)
.await?;
let order_id = ctx.new_guid().await?;
Ok(OrderOutput {
order_id,
status: "completed".to_string(),
})
}
Orchestration Versioning
When running long-lived orchestrations in production, you'll need to evolve their logic over time while instances are still running. Duroxide supports orchestration versioning to handle this safely.
Key Concepts
Version Registration:
// Default version (1.0.0)
OrchestrationRegistry::builder()
.register_typed(MY_ORCHESTRATION, my_orchestration)
// Explicit versions
.register_versioned_typed(MY_ORCHESTRATION, "1.0.1", my_orchestration_v1_0_1)
.register_versioned_typed(MY_ORCHESTRATION, "1.0.2", my_orchestration_v1_0_2)
.build();
Version Upgrade Timing (Critical):
Version upgrades happen at continue_as_new time, not when the server restarts. If an old version has a timer running, it must complete before the upgrade occurs.
T+0:00 Server restarts with v1.0.2 registered
But v1.0.1 was mid-cycle with ~1 min left on its timer
T+1:00 v1.0.1's timer expires, completes its work
T+1:01 v1.0.1 calls continue_as_new() → resolves to v1.0.2
Database updated: orchestration_version = "1.0.2"
Best Practices:
- Keep the orchestration NAME constant stable across all versions
- Create separate functions per version:
my_orch(),my_orch_v1_0_1(),my_orch_v1_0_2() - Add version prefix to trace logs:
ctx.trace_info("[v1.0.2] Starting cycle...") - Never remove old version registrations while instances might still be running them
📚 See versioning-best-practices.md for comprehensive patterns including code organization, common scenarios, and a complete best practices checklist.
Debugging Tips
1. Use Trace Statements
async fn debuggable_orch(ctx: OrchestrationContext, input: String) -> Result<String, String> {
ctx.trace_info(format!("Starting with input: {}", input));
let step1 = ctx.schedule_activity("Step1", input).await?;
ctx.trace_info(format!("Step1 completed: {}", step1));
let step2 = ctx.schedule_activity("Step2", step1).await?;
ctx.trace_info(format!("Step2 completed: {}", step2));
ctx.trace_info("Workflow completed successfully");
Ok(step2)
}
2. Inspect History
// Get orchestration history (requires management capability)
let client = Client::new(store);
let history = client.read_execution_history("my-instance", 1).await.unwrap();
for event in history {
println!("{:?}", event);
}
3. Inspect Deterministic Flow
Use trace statements and history inspection to understand orchestration progress across turns. Each turn is triggered by new completions and is replay-safe; there is no public turn index.
Advanced Patterns
Dynamic Activity Scheduling
async fn dynamic_workflow(ctx: OrchestrationContext, config_json: String) -> Result<String, String> {
let config: WorkflowConfig = serde_json::from_str(&config_json)?;
// Determine activities to run based on input
let activity_names = match config.workflow_type.as_str() {
"standard" => vec!["Step1", "Step2", "Step3"],
"expedited" => vec!["FastStep1", "FastStep2"],
"manual" => vec!["ManualReview"],
_ => return Err("Unknown workflow type".to_string()),
};
// Schedule determined activities
let mut result = config_json;
for activity_name in activity_names {
result = ctx.schedule_activity(activity_name, result)
.await?;
}
Ok(result)
}
Actor-Style Orchestrations
// Orchestration as a long-lived actor
async fn order_actor(ctx: OrchestrationContext, state_json: String) -> Result<(), String> {
let mut state: OrderState = serde_json::from_str(&state_json)?;
// Wait for next command
let command = ctx.schedule_wait("OrderCommand").await;
let cmd: Command = serde_json::from_str(&command)?;
match cmd.action.as_str() {
"cancel" => {
state.status = "cancelled".to_string();
ctx.schedule_activity("SendCancellationEmail", state_json).await?;
Ok(()) // Terminal
}
"ship" => {
state.status = "shipped".to_string();
ctx.schedule_activity("ShipOrder", serde_json::to_string(&state)?).await?;
// Continue waiting for more commands
return ctx.continue_as_new(serde_json::to_string(&state)?).await;
}
_ => {
// Unknown command, continue waiting
return ctx.continue_as_new(state_json).await;
}
}
}
Performance Considerations
1. Minimize History Size
Each orchestration turn appends events. Keep history manageable:
// ❌ Bad - Creates millions of events
async fn bad_loop(ctx: OrchestrationContext, _input: String) -> Result<String, String> {
for i in 0..1_000_000 { // Too many!
ctx.schedule_activity("Process", i.to_string()).await?;
}
Ok(())
}
// ✅ Good - Use Continue As New to reset history
async fn good_loop(ctx: OrchestrationContext, state_json: String) -> Result<String, String> {
let state: State = serde_json::from_str(&state_json)?;
// Process batch of 100
for i in 0..100 {
ctx.schedule_activity("Process", (state.offset + i).to_string())
.await?;
}
state.offset += 100;
if state.offset < state.total {
return ctx.continue_as_new(serde_json::to_string(&state)?).await; // Fresh history
} else {
Ok("All processed".to_string())
}
}
2. Batch Operations
// ❌ Slow - One DB call per item
async fn slow_updates(ctx: OrchestrationContext, items: Vec<String>) -> Result<(), String> {
for item in items {
ctx.schedule_activity("UpdateItem", item).await?;
}
Ok(())
}
// ✅ Fast - Batch update in single activity
async fn fast_updates(ctx: OrchestrationContext, items_json: String) -> Result<(), String> {
ctx.schedule_activity("UpdateItemsBatch", items_json).await?;
Ok(())
}
Common Gotchas
1. Non-Deterministic Control Flow
Symptom: Orchestration replays differently, causes nondeterminism errors
Fix: Base all decisions on activity results or external events, never on time/random
2. Activities That Only Sleep
Symptom: Activity that only sleeps without doing actual work
Fix: Use ctx.schedule_timer() in orchestration; activities can sleep/poll as part of their work (e.g., provisioning resources)
3. Using tokio::select! or tokio::join! in Orchestrations
Symptom: Non-deterministic behavior on replay
Fix: Use ctx.select2() / ctx.select3() and ctx.join() instead. These use Duroxide's replay-safe combinators for deterministic polling and winner selection.
Checklist for Production Orchestrations
- All side effects in activities (no I/O in orchestration code)
- Orchestration delays use
schedule_timer()(activities can sleep/poll as needed) - All branching is deterministic (based on activity results)
- Activities are idempotent (can retry safely)
- Error handling for all activities (match on Result)
- Trace statements for observability (
ctx.trace_*()in both orchestrations and activities) - Continue As New for long-running workflows (avoid unbounded history)
- Timeouts on external events (use select2 with timer)
- Compensation logic for failures (saga pattern)
- Tests written (unit + integration)
Quick Reference
OrchestrationContext API
| Method | Returns | Use For |
|---|---|---|
schedule_activity(name, input) | impl Future<Output = Result<String, String>> | Database, HTTP, file I/O |
schedule_timer(delay) | impl Future<Output = ()> | Delays, timeouts, scheduling |
schedule_wait(event_name) | impl Future<Output = String> | External signals, human input |
schedule_sub_orchestration(name, input) | impl Future<Output = Result<String, String>> | Child workflows |
schedule_orchestration(name, instance, input) | () | Fire-and-forget |
select2(a, b) | Either2<T1, T2> | First to complete |
join(vec) | Vec<T> | Wait for all |
continue_as_new(input) | impl Future (never resolves) | Reset history, keep running |
new_guid() | impl Future<Output = Result<String, String>> | Correlation IDs |
utc_now() | impl Future<Output = Result<u64, String>> | Timestamps |
set_kv_value(key, value) | () | Store durable KV pair |
get_kv_value(key) | Option<String> | Read local KV |
get_kv_all_values() | HashMap<String, String> | Snapshot of all KV entries |
get_kv_all_keys() | Vec<String> | List all KV keys |
get_kv_length() | usize | Count of KV entries |
clear_kv_value(key) | () | Remove a KV entry |
clear_all_kv_values() | () | Remove all KV entries |
prune_kv_values_updated_before(ms) | usize | Remove old snapshot keys |
get_kv_value_from_instance(id, key) | impl Future<Output = Result<Option<String>, String>> | Cross-instance KV read |
trace_info/warn/error(msg) | () | Logging |
Getting Help
- Examples:
examples/directory - runnable samples - Tests:
tests/e2e_samples.rs- comprehensive patterns - API Docs: Run
cargo doc --open - Replay Debugging: Check history with
client.read_execution_history()(management capability required) - Cross-Crate Composition: See
docs/cross-crate-registry-pattern.mdfor organizing orchestrations across multiple crates
You now have everything needed to build production-grade durable workflows! 🎉