Worker Design & Implementation
June 22, 2026 · View on GitHub
Version: 4.1 | Date: 2025-11-28 | SDK: 1.3.0+
Recent Updates (v4.0):
- ✅ AsyncTaskRunner: Pure async/await execution (zero thread overhead for async workers)
- ✅ Auto-Detection: Automatic runner selection based on
defvsasync def - ✅ Async HTTP:
httpx.AsyncClientfor non-blocking poll/update operations - ✅ Direct Execution:
await worker_fn()- no thread context switches - ✅ Process Isolation: One process per worker, clients created after fork
What is a Worker?
Workers are task execution units in Netflix Conductor that poll for and execute tasks within workflows. When a workflow reaches a task, Conductor queues it for execution. Workers continuously poll Conductor for tasks matching their registered task types, execute the business logic, and return results.
Key Concepts:
- Task: Unit of work in a workflow (e.g., "send_email", "process_payment")
- Worker: Python function (sync or async) decorated with
@worker_taskthat implements task logic - Polling: Workers actively poll Conductor for pending tasks
- Execution: Workers run task logic and return results (success, failure, or in-progress)
- Scalability: Multiple workers can process the same task type concurrently
Example Workflow:
Workflow: Order Processing
├── Task: validate_order (worker: order_validator)
├── Task: charge_payment (worker: payment_processor)
└── Task: send_confirmation (worker: email_sender)
Each task is executed by a dedicated worker that polls for that specific task type.
Quick Start
Sync Worker
from conductor.client.worker.worker_task import worker_task
@worker_task(task_definition_name='process_data', thread_count=5)
def process_data(input_value: int) -> dict:
result = expensive_computation(input_value)
return {'result': result}
Async Worker (Automatic High Concurrency)
@worker_task(task_definition_name='fetch_data', thread_count=50)
async def fetch_data(url: str) -> dict:
# Automatically runs as non-blocking coroutine
# Higher concurrency for I/O-bound workloads
async with httpx.AsyncClient() as client:
response = await client.get(url)
return {'data': response.json()}
Start Workers
from conductor.client.automator.task_handler import TaskHandler
from conductor.client.configuration.configuration import Configuration
with TaskHandler(
configuration=Configuration(),
scan_for_annotated_workers=True,
import_modules=['my_app.workers']
) as handler:
handler.start_processes()
handler.join_processes()
Architecture Diagram
┌─────────────────────────────────────────────────────────────────────────────┐
│ Main Process: TaskHandler │
│ • Discovers workers (@worker_task decorator) │
│ • Auto-detects sync (def) vs async (async def) │
│ • Spawns one Process per worker │
│ • Manages worker lifecycle │
└─────────────────────────────────────────────────────────────────────────────┘
│
┌─────────────────────────┼─────────────────────────┐
│ │ │
▼ ▼ ▼
┌──────────────────────┐ ┌───────────────────────┐ ┌──────────────────────┐
│ Process 1 │ │ Process 2 │ │ Process 3 │
│ Worker: fetch_data │ │ Worker: process_cpu │ │ Worker: send_email │
│ Type: async def │ │ Type: def │ │ Type: async def │
├──────────────────────┤ ├───────────────────────┤ ├──────────────────────┤
│ │ │ │ │ │
│ AsyncTaskRunner │ │ TaskRunner │ │ AsyncTaskRunner │
│ ┌────────────────┐ │ │ ┌────────────────┐ │ │ ┌────────────────┐ │
│ │ Event Loop │ │ │ │ ThreadPool │ │ │ │ Event Loop │ │
│ │ (asyncio) │ │ │ │ (thread_count) │ │ │ │ (asyncio) │ │
│ └────────────────┘ │ │ └────────────────┘ │ │ └────────────────┘ │
│ │ │ │ │ │ │ │ │
│ │ Polling │ │ │ Polling │ │ │ Polling │
│ ▼ │ │ ▼ │ │ ▼ │
│ ┌────────────────┐ │ │ ┌────────────────┐ │ │ ┌────────────────┐ │
│ │ async def poll │ │ │ │ Sync poll │ │ │ │ async def poll │ │
│ │ (httpx.Async) │ │ │ │ (requests) │ │ │ │ (httpx.Async) │ │
│ └────────────────┘ │ │ └────────────────┘ │ │ └────────────────┘ │
│ │ │ │ │ │ │ │ │
│ ▼ │ │ ▼ │ │ ▼ │
│ ┌────────────────┐ │ │ ┌────────────────┐ │ │ ┌────────────────┐ │
│ │await worker_fn │ │ │ │executor.submit │ │ │ │await worker_fn │ │
│ │ (direct!) │ │ │ │ worker_fn() │ │ │ │ (direct!) │ │
│ └────────────────┘ │ │ └────────────────┘ │ │ └────────────────┘ │
│ │ │ │ │ │ │ │ │
│ Semaphore │ │ Executor Capacity │ │ Semaphore │
│ (limits execution) │ │ (limits execution) │ │ (limits execution) │
│ │ │ │ │ │ │ │ │
│ ▼ │ │ ▼ │ │ ▼ │
│ ┌────────────────┐ │ │ ┌────────────────┐ │ │ ┌────────────────┐ │
│ │async def update│ │ │ │ Sync update │ │ │ │async def update│ │
│ │ (httpx.Async) │ │ │ │ (requests) │ │ │ │ (httpx.Async) │ │
│ └────────────────┘ │ │ └────────────────┘ │ │ └────────────────┘ │
│ │ │ │ │ │
│ Threads: 1 │ │ Threads: 1+N │ │ Threads: 1 │
│ Concurrency: High │ │ Concurrency: N │ │ Concurrency: High │
└──────────────────────┘ └───────────────────────┘ └──────────────────────┘
Legend:
├─ Process boundary (isolation)
│ ├─ AsyncTaskRunner: Pure async/await, single event loop
│ └─ TaskRunner: Thread pool executor
└─ Auto-selected based on function signature (def vs async def)
Worker Execution
Execution mode is automatically detected based on function signature:
Sync Workers (def) → TaskRunner
- Execute in ThreadPoolExecutor (thread pool)
- Uses
TaskRunnerfor polling/execution - Blocking poll/update (requests library)
- Best for: CPU-bound tasks, blocking I/O
- Concurrency: Limited by
thread_count(number of threads) - Threads: 1 (main) + thread_count (pool)
Async Workers (async def) → AsyncTaskRunner
- Execute directly in async event loop (pure async/await)
- Uses
AsyncTaskRunnerfor polling/execution - Non-blocking poll/update (httpx.AsyncClient)
- Best for: I/O-bound tasks (HTTP, DB, file operations)
- Concurrency: Higher than sync workers for I/O-bound workloads
- Automatic: No configuration needed
- Threads: 1 (event loop only)
- Can return
None: Async tasks can legitimately returnNoneas their result
Key Benefits of AsyncTaskRunner:
- Zero Thread Overhead: Single event loop per process (no ThreadPoolExecutor, no BackgroundEventLoop)
- Direct Execution:
await worker_fn()- no thread context switches - Async HTTP: Uses
httpx.AsyncClientfor non-blocking polling/updates - Memory Efficient: Lower memory footprint per process
- High Concurrency: Up to
thread_counttasks running concurrently viaasyncio.gather() - Accurate Timing: Execution time measured from start to completion
Implementation Details:
# Async worker - automatically uses AsyncTaskRunner
@worker_task(task_definition_name='fetch_data', thread_count=50)
async def fetch_data(url: str) -> dict:
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.json()
# Can also return None explicitly
@worker_task(task_definition_name='log_event')
async def log_event(event: str) -> None:
await logger.log(event)
return None # This works correctly!
# Or no return statement (implicit None)
@worker_task(task_definition_name='notify')
async def notify(message: str):
await send_notification(message)
# Implicit None return - works correctly!
Async Flow (AsyncTaskRunner):
- TaskHandler detects
async defworker function - Creates
AsyncTaskRunnerinstead ofTaskRunner - Process runs
asyncio.run(async_task_runner.run()) - Single event loop handles: async poll → async execute → async update
- Up to
thread_counttasks run concurrently viaasyncio.gather() - No thread context switches - pure async/await
Sync Flow (TaskRunner):
- TaskHandler detects
defworker function - Creates
TaskRunner(existing behavior) - Process runs thread-based polling/execution
- Works exactly as before (backward compatible)
AsyncTaskRunner Architecture
Design Goals
The AsyncTaskRunner eliminates thread overhead for async workers by using pure async/await execution:
Problem with BackgroundEventLoop approach:
Main Thread → polls (blocking httpx.Client)
→ ThreadPoolExecutor thread → detects coroutine
→ BackgroundEventLoop thread → runs async task
Thread count: 3 threads + 5+ context switches per task
Solution with AsyncTaskRunner:
Single Event Loop → await async_poll()
→ await async_execute() (direct!)
→ await async_update()
Thread count: 1 thread (event loop) + 0 context switches
Key Implementation Details
1. Auto-Detection in TaskHandler
# task_handler.py:272
is_async_worker = inspect.iscoroutinefunction(worker.execute_function)
if is_async_worker:
async_task_runner = AsyncTaskRunner(...)
process = Process(target=self.__run_async_runner, args=(async_task_runner,))
else:
task_runner = TaskRunner(...)
process = Process(target=task_runner.run)
User Impact: None - completely transparent
2. Client Creation After Fork
# async_task_runner.py:107
async def run(self):
# Create async HTTP client in subprocess (after fork)
# httpx.AsyncClient is not picklable, so we defer creation
self.async_api_client = AsyncApiClient(...)
self.async_task_client = AsyncTaskResourceApi(...)
# Create semaphore in event loop
self._semaphore = asyncio.Semaphore(self._max_workers)
Why: httpx.AsyncClient and asyncio.Semaphore are not picklable and must be created in the subprocess
3. Direct Async Execution
# async_task_runner.py:364
async def __async_execute_task(self, task: Task):
# Get worker parameters
task_input = {...}
# Direct await - NO threads, NO BackgroundEventLoop!
task_output = await self.worker.execute_function(**task_input)
# Build TaskResult
return task_result
Benefit: Zero thread overhead, direct coroutine execution
4. Concurrency Control & Batch Polling
Both TaskRunner and AsyncTaskRunner use dynamic batch polling:
# Calculate available slots (both runners)
current_capacity = len(self._running_tasks) # + pending_async for TaskRunner
available_slots = self._max_workers - current_capacity
# Batch poll with dynamic count (both runners)
tasks = batch_poll(available_slots) # or await async_batch_poll(available_slots)
# As tasks complete, available_slots increases
# As new tasks are polled, available_slots decreases
TaskRunner - ThreadPoolExecutor limits concurrency:
# Capacity controlled by executor + tracking
for task in tasks:
future = self._executor.submit(execute_and_update, task)
self._running_tasks.add(future) # Track futures
# ThreadPoolExecutor queues excess tasks automatically
AsyncTaskRunner - Semaphore limits execution:
# Capacity controlled by tracking + semaphore during execution
for task in tasks:
asyncio_task = asyncio.create_task(execute_and_update(task))
self._running_tasks.add(asyncio_task) # Track asyncio tasks
# Inside execute_and_update:
async def __async_execute_and_update_task(self, task):
async with self._semaphore: # Limit to thread_count concurrent
task_result = await self.__async_execute_task(task)
await self.__async_update_task(task_result)
Key Insight: Both use the same batch polling logic with dynamic capacity calculation. The difference is in how concurrency is limited:
- TaskRunner: ThreadPoolExecutor naturally limits concurrent threads
- AsyncTaskRunner: Semaphore explicitly limits concurrent executions
Semantics: thread_count means "max concurrent executions" in both models
5. Task Tracking
# async_task_runner.py:184
asyncio_task = asyncio.create_task(self.__async_execute_and_update_task(task))
self._running_tasks.add(asyncio_task)
asyncio_task.add_done_callback(self._running_tasks.discard) # Auto-cleanup
Benefit: Automatic cleanup, no manual tracking needed
Performance Comparison
| Metric | TaskRunner (Async) | AsyncTaskRunner | Notes |
|---|---|---|---|
| Threads per worker | 3 (main + pool + event loop) | 1 (event loop only) | Fewer threads |
| Context switches/task | 5+ | 0 | No context switches |
| Latency overhead | Thread context switches | Direct await | Lower latency |
| Throughput (I/O) | Limited by threads | Limited by event loop | Higher for I/O workloads |
Feature Parity
AsyncTaskRunner has 100% feature parity with TaskRunner:
| Feature | TaskRunner | AsyncTaskRunner | Notes |
|---|---|---|---|
| Batch polling | ✅ | ✅ | Uses AsyncTaskResourceApi |
| Token refresh | ✅ | ✅ | Identical logic with backoff |
| Event publishing | ✅ | ✅ | All 6 events, same timing |
| Metrics collection | ✅ | ✅ | Via event listeners |
| Custom listeners | ✅ | ✅ | Same event_listeners param |
| Configuration | ✅ | ✅ | Same 3-tier hierarchy |
| Adaptive backoff | ✅ | ✅ | Same exponential logic |
| Auth backoff | ✅ | ✅ | Same 2^failures logic |
| Capacity limits | ✅ | ✅ | Semaphore vs ThreadPool |
| Task retry | ✅ | ✅ | 4 attempts, 10s/20s/30s |
| Error handling | ✅ | ✅ | Same exception handling |
Critical Implementation Notes
⚠️ Pickling Constraints
AsyncTaskRunner defers creation of non-picklable objects until after fork:
# __init__: Set to None (will be pickled)
self.async_api_client = None
self.async_task_client = None
self._semaphore = None
# run(): Create in subprocess
async def run(self):
# NOW safe to create (after fork, in event loop)
self.async_api_client = AsyncApiClient(...)
self._semaphore = asyncio.Semaphore(...)
Objects that CANNOT be pickled:
httpx.AsyncClient(contains event loop state)asyncio.Semaphore(tied to specific event loop)- Any object with async resources
⚠️ Token Refresh in Async Context
The sync version calls __refresh_auth_token() in __init__, but async cannot use await in __init__. Solution: lazy token fetch on first API call:
# async_api_client.py:640
async def __get_authentication_headers(self):
if self.configuration.AUTH_TOKEN is None:
if self.configuration.authentication_settings is None:
return None
# Lazy fetch on first call
token = await self.__get_new_token(skip_backoff=False)
self.configuration.update_token(token)
Configuration
Hierarchy (highest priority first)
- Worker-specific env:
conductor.worker.<worker_name>.<property> - Global env:
conductor.worker.all.<property> - Code:
@worker_task(property=value)
Supported Properties
| Property | Type | Default | Description |
|---|---|---|---|
poll_interval_millis | int | 100 | Polling interval (ms) |
thread_count | int | 1 | Concurrent tasks (sync) or concurrency limit (async) |
domain | str | None | Worker domain |
worker_id | str | auto | Worker identifier |
poll_timeout | int | 100 | Poll timeout (ms) |
lease_extend_enabled | bool | False | ⚠️ Not implemented - use TaskInProgress instead |
register_task_def | bool | False | Auto-register task definition with JSON schemas (draft-07) |
overwrite_task_def | bool | True | Overwrite existing task definitions (when register_task_def=True) |
strict_schema | bool | False | Enforce strict schema validation (additionalProperties=false) |
paused | bool | False | Pause worker (env-only, not in decorator) |
Examples
Code:
@worker_task(
task_definition_name='process_order',
poll_interval_millis=1000,
thread_count=5,
domain='dev'
)
def process_order(order_id: str): pass
Environment Variables:
# Global
export conductor.worker.all.domain=production
export conductor.worker.all.thread_count=20
# Worker-specific (overrides global)
export conductor.worker.process_order.thread_count=50
Result: domain=production, thread_count=50
Automatic Task Definition Registration
When register_task_def=True, the worker automatically registers its task definition with Conductor on startup, including JSON schemas generated from type hints.
Example:
from dataclasses import dataclass
@dataclass
class OrderInfo:
order_id: str
amount: float
customer_id: int
@worker_task(
task_definition_name='process_order',
register_task_def=True # Auto-register on startup
)
def process_order(order: OrderInfo, priority: int = 1) -> dict:
return {'status': 'processed', 'order_id': order.order_id}
What Gets Registered:
-
Task Definition:
process_order -
Input Schema (
process_order_input):
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"order": {
"type": "object",
"properties": {
"order_id": {"type": "string"},
"amount": {"type": "number"},
"customer_id": {"type": "integer"}
},
"required": ["order_id", "amount", "customer_id"]
},
"priority": {"type": "integer"}
},
"required": ["order"]
}
- Output Schema (
process_order_output):
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object"
}
Supported Types:
- Basic:
str,int,float,bool,dict,list - Optional:
Optional[T] - Collections:
List[T],Dict[str, T] - Dataclasses (with recursive field conversion)
- Union types (filters out TaskInProgress/None for return types)
Behavior:
- ✅ Skips if task definition already exists (no overwrite)
- ✅ Skips if schemas already exist (no overwrite)
- ✅ Workers start even if registration fails (just logs warning)
- ✅ Works for both sync and async workers
- ⚠️ Only works for function-based workers (
@worker_taskdecorator) - ⚠️ Class-based workers not supported (no execute_function attribute)
Environment Override:
# Enable for all workers (Unix format - recommended)
export CONDUCTOR_WORKER_ALL_REGISTER_TASK_DEF=true
# Enable for specific worker (Unix format - recommended)
export CONDUCTOR_WORKER_PROCESS_ORDER_REGISTER_TASK_DEF=true
# Control overwrite behavior
export CONDUCTOR_WORKER_ALL_OVERWRITE_TASK_DEF=false # Don't overwrite existing
# Enable strict schema validation
export CONDUCTOR_WORKER_PROCESS_ORDER_STRICT_SCHEMA=true # No extra fields allowed
# Alternative: Dot notation (also works)
export conductor.worker.all.register_task_def=true
export conductor.worker.process_order.strict_schema=true
Schema Validation Modes
The strict_schema flag controls JSON Schema validation strictness:
Lenient Mode (default, strict_schema=False):
{
"type": "object",
"properties": {...},
"additionalProperties": true ← Extra fields allowed
}
Strict Mode (strict_schema=True):
{
"type": "object",
"properties": {...},
"additionalProperties": false ← Extra fields rejected
}
Use Cases:
- Lenient (default): Development, backward compatibility, flexible integrations
- Strict: Production, strict validation, contract enforcement
Example:
@worker_task(
task_definition_name='validate_order',
register_task_def=True,
strict_schema=True # Enforce strict validation
)
def validate_order(order_id: str, amount: float) -> dict:
return {}
# Generated schema will reject inputs with extra fields
Task Definition Overwrite Control
The overwrite_task_def flag controls update behavior:
Overwrite Mode (default, overwrite_task_def=True):
- Always calls
update_task_def()(overwrites existing) - Ensures server has latest configuration from code
- Use in development or when task config changes frequently
No-Overwrite Mode (overwrite_task_def=False):
- Checks if task exists first
- Only creates if doesn't exist
- Preserves manual changes on server
- Use when tasks are managed outside code
Example:
@worker_task(
task_definition_name='stable_task',
register_task_def=True,
overwrite_task_def=False # Don't overwrite existing config
)
def stable_worker(data: dict) -> dict:
return {}
# If task exists on server, keeps existing configuration
# If task doesn't exist, registers new one
Startup Configuration Logging
When workers start, they log their resolved configuration in a compact single-line format:
INFO - Conductor Worker[name=process_order, pid=12345, status=active, poll_interval=1000ms, domain=production, thread_count=50, poll_timeout=100ms, lease_extend=false]
This shows:
- Worker name and process ID (useful for multi-process debugging)
- Status (active/paused)
- All resolved configuration values
- Configuration source (code, global env, or worker-specific env)
Benefits:
- Quick verification of configuration in logs
- Process ID for debugging multi-process issues
- Easy debugging of environment variable issues
- Single-line format for log aggregation tools
Example logs:
INFO - Conductor Worker[name=greet_sync, pid=63761, status=active, poll_interval=100ms, thread_count=10, poll_timeout=100ms, lease_extend=false]
INFO - Conductor Worker[name=greet_async, pid=63762, status=active, poll_interval=100ms, thread_count=50, poll_timeout=100ms, lease_extend=false]
Note: Each worker runs in its own process, so each has a unique PID.
Worker Discovery
Automatic worker discovery from packages, similar to Spring's component scanning in Java.
Overview
The WorkerLoader class provides automatic discovery of workers decorated with @worker_task by scanning Python packages. This eliminates the need to manually register each worker.
Auto-Discovery Methods
Option 1: TaskHandler auto-discovery (Recommended)
from conductor.client.automator.task_handler import TaskHandler
handler = TaskHandler(
configuration=config,
scan_for_annotated_workers=True,
import_modules=['my_app.workers', 'my_app.tasks']
)
Option 2: Explicit WorkerLoader
from conductor.client.worker.worker_loader import auto_discover_workers
# Auto-discover workers from packages
loader = auto_discover_workers(
packages=['my_app.workers', 'my_app.tasks'],
print_summary=True
)
# Start task handler with discovered workers
handler = TaskHandler(configuration=config)
WorkerLoader API
from conductor.client.worker.worker_loader import WorkerLoader
loader = WorkerLoader()
# Scan multiple packages (recursive by default)
loader.scan_packages(['my_app.workers', 'shared.workers'])
# Scan specific modules
loader.scan_module('my_app.workers.order_tasks')
# Scan filesystem path
loader.scan_path('/app/workers', package_prefix='my_app.workers')
# Non-recursive scanning
loader.scan_packages(['my_app.workers'], recursive=False)
# Get discovered workers
workers = loader.get_workers()
print(f"Found {len(workers)} workers")
# Print discovery summary
loader.print_summary()
Convenience Functions
from conductor.client.worker.worker_loader import scan_for_workers, auto_discover_workers
# Quick scanning
loader = scan_for_workers('my_app.workers', 'my_app.tasks')
# Auto-discover with summary
loader = auto_discover_workers(
packages=['my_app.workers'],
print_summary=True
)
How It Works
- Package Scanning: The loader imports Python packages and modules
- Automatic Registration:
@worker_taskdecorators automatically register workers during import - Worker Retrieval: Loader retrieves registered workers from the global registry
- Execution Mode: Auto-detected from function signature (
defvsasync def)
Best Practices
1. Organize Workers by Domain
my_app/
├── workers/
│ ├── __init__.py
│ ├── order/ # Order-related workers
│ │ ├── process.py
│ │ └── validate.py
│ ├── payment/ # Payment-related workers
│ │ ├── charge.py
│ │ └── refund.py
│ └── notification/ # Notification workers
│ ├── email.py
│ └── sms.py
2. Environment-Specific Loading
import os
env = os.getenv('ENV', 'production')
if env == 'production':
packages = ['my_app.workers']
else:
packages = ['my_app.workers', 'my_app.test_workers']
loader = auto_discover_workers(packages=packages)
3. Use Package init.py Files
# my_app/workers/__init__.py
"""
Workers package - all worker modules auto-discovered
"""
Troubleshooting
Workers Not Discovered:
- Ensure packages have
__init__.pyfiles - Check package name is correct
- Verify
@worker_taskdecorator is present - Check for import errors in worker modules
Import Errors:
- Verify dependencies are installed
- Check
PYTHONPATHincludes necessary directories - Look for circular imports
Metrics & Monitoring
This design document describes how worker events flow through the SDK. The
current user-facing metrics setup, legacy and canonical metric catalogs,
WORKER_CANONICAL_METRICS behavior, Prometheus examples, and migration guidance
are maintained in ../../METRICS.md.
Keep metric names and PromQL examples out of this design document so the SDK has one source of truth for legacy and canonical metrics.
Polling Loop
Implementation (Both TaskRunner and AsyncTaskRunner)
Core polling loop with dynamic batch sizing:
def run_once(self):
# 1. Cleanup completed tasks immediately
cleanup_completed_tasks() # Removes done futures/asyncio tasks
# 2. Calculate available capacity dynamically
current_capacity = len(self._running_tasks)
if current_capacity >= self._max_workers:
time.sleep(0.001) # At capacity, wait briefly
return
# 3. Calculate how many tasks we can accept
available_slots = self._max_workers - current_capacity
# Example: thread_count=10, running=3 → available_slots=7
# 4. Adaptive backoff when queue is empty
if consecutive_empty_polls > 0:
delay = min(0.001 * (2 ** consecutive_empty_polls), poll_interval)
# Exponential: 1ms → 2ms → 4ms → 8ms → poll_interval
if time_since_last_poll < delay:
time.sleep(delay - time_since_last_poll)
return
# 5. Batch poll with available_slots count
tasks = batch_poll(available_slots) # Poll up to 7 tasks
# 6. Submit tasks for execution
if tasks:
for task in tasks:
# TaskRunner: executor.submit() → thread pool
# AsyncTaskRunner: asyncio.create_task() → event loop
submit_for_execution(task)
self._running_tasks.add(task_future)
consecutive_empty_polls = 0
else:
consecutive_empty_polls += 1
# Loop continues - as tasks complete, available_slots increases
Key Optimizations
Dynamic Batch Sizing:
- Batch size =
thread_count - currently_running - Automatically adjusts as tasks complete
- Prevents over-polling (respects capacity)
- Example flow with thread_count=10:
Poll 1: running=0 → batch_poll(10) → get 10 tasks Poll 2: running=10 → skip (at capacity) Poll 3: running=7 → batch_poll(3) → get 3 tasks Poll 4: running=2 → batch_poll(8) → get 8 tasks
Other Optimizations:
- Immediate cleanup: Completed tasks removed immediately for accurate capacity
- Adaptive backoff: Exponential backoff when queue empty (1ms → 2ms → 4ms → poll_interval)
- Batch polling: significant API call reduction vs polling one at a time
- Non-blocking checks: Fast capacity calculation (no locks needed)
Best Practices
Worker Selection
Choose the right execution mode based on workload:
# CPU-bound: Use sync workers with low thread_count
# (Python GIL limits CPU parallelism, use multiple processes instead)
@worker_task(task_definition_name='compute_task', thread_count=4)
def cpu_task(data: list) -> dict:
result = expensive_computation(data) # CPU-intensive
return {'result': result}
# I/O-bound sync: Use sync workers with higher thread_count
# (Blocking I/O: file reads, subprocess calls, legacy libraries)
@worker_task(task_definition_name='file_task', thread_count=20)
def io_sync(file_path: str) -> dict:
with open(file_path) as f: # Blocking I/O
data = f.read()
return {'data': data}
# I/O-bound async: Use async workers with high concurrency
# (Non-blocking I/O: HTTP, database, async file I/O)
# ✅ RECOMMENDED for HTTP/API calls, database queries
@worker_task(task_definition_name='api_task', thread_count=50)
async def io_async(url: str) -> dict:
async with httpx.AsyncClient() as client:
response = await client.get(url) # Non-blocking I/O
return {'data': response.json()}
# Mixed workload: Use async with moderate concurrency
@worker_task(task_definition_name='mixed_task', thread_count=10)
async def mixed_task(url: str) -> dict:
# Async I/O
async with httpx.AsyncClient() as client:
data = await client.get(url)
# Some CPU work (still runs in event loop)
processed = process_data(data.json())
return {'result': processed}
Performance Guidelines:
| Workload | Worker Type | thread_count | Runner | Expected Throughput |
|---|---|---|---|---|
| CPU-bound | def | 1-4 | TaskRunner | Limited by GIL |
| I/O-bound sync | def | 10-50 | TaskRunner | Moderate |
| I/O-bound async | async def | 50-200 | AsyncTaskRunner | High |
Configuration
# Development
export conductor.worker.all.domain=dev
export conductor.worker.all.poll_interval_millis=1000
# Production - Sync Workers
export conductor.worker.all.domain=production
export conductor.worker.all.poll_interval_millis=250
export conductor.worker.all.thread_count=20
# Production - Async Workers
export conductor.worker.all.domain=production
export conductor.worker.all.poll_interval_millis=100 # Lower for async (less overhead)
export conductor.worker.my_async_task.thread_count=100 # Higher concurrency
Long-Running Tasks & Lease Extension
Task lease extension allows long-running tasks to maintain ownership and prevent timeouts during execution. When a worker polls a task, it receives a "lease" with a timeout period (defined by responseTimeoutSeconds in task definition).
⚠️ Important: Currently, only manual lease extension via TaskInProgress is implemented. The lease_extend_enabled configuration parameter exists but is not yet implemented - no automatic lease extension occurs.
Manual Lease Extension with TaskInProgress:
To extend a task lease, explicitly return TaskInProgress:
from conductor.client.context.task_context import TaskInProgress
from typing import Union
@worker_task(task_definition_name='batch_processor')
def process_batch(batch_id: str) -> Union[dict, TaskInProgress]:
ctx = get_task_context()
poll_count = ctx.get_poll_count()
# Process 100 items per poll
processed = process_next_100_items(batch_id, offset=poll_count * 100)
if processed < 100:
# All done
return {'status': 'completed', 'total_processed': poll_count * 100 + processed}
else:
# More work to do - extend lease
return TaskInProgress(
callback_after_seconds=30, # Re-queue in 30s
output={'progress': poll_count * 100 + processed}
)
Polling External Systems:
@worker_task(task_definition_name='wait_for_approval')
def wait_for_approval(request_id: str) -> Union[dict, TaskInProgress]:
approval_status = check_approval_system(request_id)
if approval_status == 'PENDING':
# Still waiting - extend lease
return TaskInProgress(
callback_after_seconds=30,
output={'status': 'waiting'}
)
elif approval_status == 'APPROVED':
return {'status': 'approved'}
else:
raise Exception(f"Request rejected: {approval_status}")
Task Definition Requirements:
Configure appropriate timeouts in your task definition:
{
"name": "long_processing_task",
"responseTimeoutSeconds": 300, // 5 min per execution (before returning TaskInProgress)
"timeoutSeconds": 3600, // 1 hour total timeout (all iterations combined)
"timeoutPolicy": "RETRY",
"retryCount": 3
}
Key Points:
- ⚠️
lease_extend_enabledparameter exists but is NOT implemented - has no effect - Manual lease extension only: Must return
TaskInProgressto extend lease responseTimeoutSeconds: How long worker has before returning result/TaskInProgresstimeoutSeconds: Total allowed time (all TaskInProgress callbacks combined)- Use
TaskInProgressfor checkpointing and progress tracking - Monitor
poll_countto prevent infinite loops - Set
responseTimeoutSecondsbased on your typical TaskInProgress interval
Choosing thread_count
For Sync Workers (TaskRunner):
thread_count= size of ThreadPoolExecutor- Each task consumes one thread
- Recommendation: 1-4 for CPU, 10-50 for I/O
For Async Workers (AsyncTaskRunner):
thread_count= max concurrent async tasks (semaphore limit)- All tasks share one event loop thread
- Recommendation: 50-200 for I/O workloads
- Higher values possible (event loop handles thousands of concurrent coroutines)
Example:
# Async worker with 100 concurrent tasks
@worker_task(task_definition_name='api_calls', thread_count=100)
async def make_api_call(url: str) -> dict:
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.json()
# Only 1 thread (event loop) handles all 100 concurrent tasks!
# vs TaskRunner: would need 100 threads
Event-Driven Interceptors
The SDK uses a fully event-driven architecture for observability, metrics collection, and custom monitoring. All metrics are collected through event listeners, making the system extensible and decoupled from worker logic.
Overview
Architecture:
Worker Execution → Event Publishing → Multiple Listeners
├─ MetricsCollector (Prometheus)
├─ Custom Monitoring
└─ Audit Logging
Key Features:
- Fully Decoupled: Zero coupling between worker logic and observability
- Event-Driven Metrics: Prometheus metrics collected via event listeners
- Synchronous Events: Events published synchronously (no async overhead)
- Extensible: Add custom listeners without SDK changes
- Multiple Backends: Support Prometheus, Datadog, CloudWatch simultaneously
How Metrics Work:
The built-in MetricsCollector is implemented as an event listener that responds to task execution events. When you enable metrics, it's automatically registered as a listener.
Event Types
Task Runner Events:
PollStarted(task_type, worker_id, poll_count)- When batch poll startsPollCompleted(task_type, duration_ms, tasks_received)- When batch poll succeedsPollFailure(task_type, duration_ms, cause)- When batch poll failsTaskExecutionStarted(task_type, task_id, worker_id, workflow_instance_id)- When task execution beginsTaskExecutionCompleted(task_type, task_id, worker_id, workflow_instance_id, duration_ms, output_size_bytes)- When task completes (includes actual async execution time)TaskExecutionFailure(task_type, task_id, worker_id, workflow_instance_id, cause, duration_ms)- When task failsTaskUpdateFailure(task_type, task_id, worker_id, workflow_instance_id, cause, retry_count, task_result)- Critical! When task update fails after all retries (4 attempts with 10s/20s/30s backoff)
Event Properties:
- All events are dataclasses with type hints
duration_ms: Actual execution time (for async tasks: from submission to completion)output_size_bytes: Size of task result payloadpoll_count: Number of tasks requested in batch poll
Basic Usage
from conductor.client.event.task_runner_events import TaskRunnerEventsListener, TaskExecutionCompleted
class CustomMonitor(TaskRunnerEventsListener):
def on_task_execution_completed(self, event: TaskExecutionCompleted):
print(f"Task {event.task_id} completed in {event.duration_ms}ms")
print(f"Output size: {event.output_size_bytes} bytes")
# Register with TaskHandler
handler = TaskHandler(
configuration=config,
event_listeners=[CustomMonitor()]
)
Built-in Metrics Listener:
# MetricsCollector is automatically registered when metrics_settings is provided
handler = TaskHandler(
configuration=config,
metrics_settings=MetricsSettings(http_port=8000) # MetricsCollector auto-registered
)
Update Retry Logic & Failure Handling
Critical: Task updates are retried with exponential backoff
Both TaskRunner and AsyncTaskRunner implement robust retry logic for task updates:
Retry Configuration:
- 4 attempts total (0, 1, 2, 3)
- Exponential backoff: 10s, 20s, 30s between retries
- Idempotent: Safe to retry updates
- Event on final failure:
TaskUpdateFailurepublished
Why This Matters: Task updates are critical - if a worker executes a task successfully but fails to update Conductor, the task result is lost. The retry logic ensures maximum reliability.
Handling Update Failures:
class UpdateFailureHandler(TaskRunnerEventsListener):
"""Handle critical update failures after all retries exhausted."""
def on_task_update_failure(self, event: TaskUpdateFailure):
# CRITICAL: Task was executed but Conductor doesn't know!
# External intervention required
# Option 1: Alert operations team
send_pagerduty_alert(
f"CRITICAL: Task update failed after {event.retry_count} attempts",
task_id=event.task_id,
workflow_id=event.workflow_instance_id
)
# Option 2: Log to external storage for recovery
backup_db.save_task_result(
task_id=event.task_id,
result=event.task_result, # Contains the actual result that was lost
timestamp=event.timestamp,
error=str(event.cause)
)
# Option 3: Attempt custom recovery
try:
# Custom retry logic with different strategy
custom_update_service.update_task_with_custom_retry(event.task_result)
except Exception as e:
logger.critical(f"Recovery failed: {e}")
# Register handler
handler = TaskHandler(
configuration=config,
event_listeners=[UpdateFailureHandler()]
)
Advanced Examples
SLA Monitoring:
class SLAMonitor(TaskRunnerEventsListener):
def __init__(self, threshold_ms: float):
self.threshold_ms = threshold_ms
def on_task_execution_completed(self, event: TaskExecutionCompleted):
if event.duration_ms > self.threshold_ms:
alert(f"SLA breach: {event.task_type} took {event.duration_ms}ms")
Cost Tracking:
class CostTracker(TaskRunnerEventsListener):
def __init__(self, cost_per_second: dict):
self.cost_per_second = cost_per_second
self.total_cost = 0.0
def on_task_execution_completed(self, event: TaskExecutionCompleted):
rate = self.cost_per_second.get(event.task_type, 0.0)
cost = rate * (event.duration_ms / 1000.0)
self.total_cost += cost
Multiple Listeners:
handler = TaskHandler(
configuration=config,
metrics_settings=metrics_settings,
event_listeners=[
SLAMonitor(threshold_ms=5000),
CostTracker(cost_per_second={'ml_task': 0.05}),
CustomAuditLogger()
]
)
Benefits
- Performance: Synchronous event publishing (minimal overhead)
- Error Isolation: Listener failures don't affect worker execution
- Flexibility: Implement only the events you need
- Type Safety: Protocol-based with full type hints
- Metrics Integration: Built-in Prometheus metrics via
MetricsCollectorlistener
Implementation:
- Events are published synchronously (not async)
SyncEventDispatcherused for task runner events- All metrics collected through event listeners
- Zero coupling between worker logic and observability
Troubleshooting
High Memory
Cause: Too many worker processes
Fix: Increase thread_count per worker, reduce worker count
Async Tasks Not Running Concurrently
Cause: Function defined as def instead of async def
Fix: Change function signature to async def to enable AsyncTaskRunner (automatic)
Async Worker Using ThreadPoolExecutor Instead of AsyncTaskRunner
Cause: Worker function not properly detected as async Check:
- Function signature is
async def(notdef) - Check logs for "Created AsyncTaskRunner" vs "Created TaskRunner"
- Verify
inspect.iscoroutinefunction(worker.execute_function)returns True
AsyncClient Pickling Errors
Error: TypeError: cannot pickle 'httpx.AsyncClient' object
Cause: AsyncClient created before fork (in __init__)
Fix: Already handled in SDK 1.3.0+ - clients created in run() after fork
Note: If you're implementing custom runners, defer async client creation to after fork
Semaphore Errors in Async Workers
Error: RuntimeError: no running event loop
Cause: Semaphore created outside event loop
Fix: Already handled in SDK 1.3.0+ - semaphore created in run() within event loop
Token Refresh Not Working in Async Workers
Cause: Token refresh requires await but __init__ is not async
Fix: Already handled in SDK 1.3.0+ - lazy token fetch on first API call in __get_authentication_headers()
Async Task Returns None Not Working
Issue: SDK version < 1.3.0 - BackgroundEventLoop approach needed sentinel pattern Fix: Upgrade to SDK 1.3.0+ which uses AsyncTaskRunner (no sentinel needed, direct await)
Tasks Not Picked Up
Check:
- Domain:
export conductor.worker.all.domain=production - Worker registered:
loader.print_summary() - Not paused:
export conductor.worker.my_task.paused=false - Check logs for runner type: "AsyncTaskRunner" vs "TaskRunner"
Timeouts
Fix: Enable lease extension or increase task timeout in Conductor
Empty Metrics
Check:
metrics_settingspassed to TaskHandler- Workers actually executing tasks
- Directory has write permissions
- Both sync and async workers publish same metrics via events
Implementation Files
Core:
src/conductor/client/automator/task_handler.py- Orchestrator (auto-selects TaskRunner vs AsyncTaskRunner)src/conductor/client/automator/task_runner.py- Sync polling loop (ThreadPoolExecutor)src/conductor/client/automator/async_task_runner.py- Async polling loop (pure async/await)src/conductor/client/worker/worker.py- Worker + BackgroundEventLoop (sync workers only)src/conductor/client/worker/worker_task.py- @worker_task decoratorsrc/conductor/client/worker/worker_config.py- Config resolutionsrc/conductor/client/worker/worker_loader.py- Discoverysrc/conductor/client/telemetry/metrics_collector.py- Metrics
Async HTTP (AsyncTaskRunner only):
src/conductor/client/http/async_rest.py- AsyncRESTClientObject (httpx.AsyncClient)src/conductor/client/http/async_api_client.py- AsyncApiClient (token refresh, retries)src/conductor/client/http/api/async_task_resource_api.py- Async batch_poll/update_task
Tests:
tests/unit/automator/test_task_runner.py- TaskRunner unit teststests/unit/automator/test_async_task_runner.py- AsyncTaskRunner unit tests (17 tests, mocked HTTP)
Examples:
examples/asyncio_workers.pyexamples/workers_e2e.py- End-to-end async worker exampleexamples/compare_multiprocessing_vs_asyncio.pyexamples/worker_configuration_example.py
Testing
Unit Tests
AsyncTaskRunner Test Suite (tests/unit/automator/test_async_task_runner.py):
# Run async worker tests
python3 -m pytest tests/unit/automator/test_async_task_runner.py -v
# All tests pass (17/17):
✅ test_async_worker_end_to_end # Full poll → execute → update flow
✅ test_async_worker_with_none_return # Workers can return None
✅ test_concurrency_limit_respected # Semaphore limits concurrent tasks
✅ test_multiple_concurrent_tasks # Concurrent execution verified
✅ test_capacity_check_prevents_over_polling # Capacity management
✅ test_worker_exception_handling # Error handling
✅ test_token_refresh_error_handling # Auth error handling
✅ test_auth_failure_backoff # Backoff on failures
✅ test_paused_worker_stops_polling # Paused worker behavior
✅ test_adaptive_backoff_on_empty_polls # Backoff on empty queue
✅ test_task_result_serialization # Complex output handling
✅ test_all_event_types_published # All 6 event types verified
✅ test_custom_event_listener_integration # Custom SLA monitor
✅ test_multiple_event_listeners # Multiple listeners receive events
✅ test_event_listener_exception_isolation # Faulty listeners don't break worker
✅ test_event_data_accuracy # Event fields validated
✅ test_metrics_collector_receives_events # MetricsCollector integration
Test Coverage:
- Core functionality: poll, execute, update
- Concurrency: semaphore limits, concurrent execution
- Error handling: worker exceptions, HTTP errors
- Token refresh: lazy fetch, TTL refresh, error backoff
- Edge cases: None returns, paused workers, capacity limits
- Event system: All 6 event types published correctly
- Event listeners: Custom listeners, multiple listeners, exception isolation
- Event data: All fields validated (duration, task_id, output_size, etc.)
- Metrics integration: MetricsCollector receives events
Test Strategy:
- HTTP requests: Mocked (AsyncMock)
- Everything else: Real (event system, configuration, serialization)
- No external dependencies
- Fast execution
Integration Tests
For full end-to-end testing with real Conductor server:
# examples/workers_e2e.py
python3 examples/workers_e2e.py
Migration Guide
From SDK < 1.3.0 to SDK 1.3.0+
Good news: No code changes required! 🎉
AsyncTaskRunner is automatically selected for async workers. Your existing code will work identically but with better performance.
What Happens Automatically
Before (SDK < 1.3.0):
@worker_task(task_definition_name='fetch_data', thread_count=50)
async def fetch_data(url: str) -> dict:
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.json()
# Used: TaskRunner + BackgroundEventLoop (3 threads)
After (SDK 1.3.0+):
@worker_task(task_definition_name='fetch_data', thread_count=50)
async def fetch_data(url: str) -> dict:
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.json()
# Uses: AsyncTaskRunner (1 event loop) - AUTOMATIC!
Changes:
- ✅ Same decorator
- ✅ Same code
- ✅ Same configuration
- ✅ Same metrics
- ✅ Same events
- ✅ Better performance (automatic)
Verification
Check logs on startup to see which runner is used:
# Async worker
INFO - Created AsyncTaskRunner for async worker: fetch_data
# Sync worker
INFO - Created TaskRunner for sync worker: process_data
Rollback
If you encounter issues with AsyncTaskRunner, you can temporarily force TaskRunner by changing async def to def:
# Temporary rollback (not recommended)
@worker_task(task_definition_name='fetch_data')
def fetch_data(url: str) -> dict: # Changed from async def
# Will use TaskRunner instead of AsyncTaskRunner
import asyncio
return asyncio.run(actual_async_work(url))
Note: This defeats the purpose - only use for debugging.
Performance Impact
Expected improvements for I/O-bound async workers:
| Metric | TaskRunner (v3.2) | AsyncTaskRunner (v4.0) | Notes |
|---|---|---|---|
| Latency | Thread context switches | Direct await | Lower latency |
| Throughput | Thread pool limited | Event loop concurrent | Higher for I/O |
| Memory | Thread pool overhead | Single event loop | Lower memory |
| CPU usage | Context switching overhead | Pure async execution | Lower CPU |
Changelog
Version 4.1 (2025-11-30)
- Enhanced Worker Discovery section with comprehensive WorkerLoader documentation
- Expanded Long-Running Tasks section with detailed lease extension patterns
- Added practical examples for checkpointing and external system polling
- Consolidated content from WORKER_DISCOVERY.md and LEASE_EXTENSION.md
- Clarified concurrency control mechanisms:
- Both TaskRunner and AsyncTaskRunner use dynamic batch polling
- Batch size = thread_count - currently_running_tasks
- TaskRunner: ThreadPoolExecutor capacity limits execution
- AsyncTaskRunner: Semaphore limits execution (during execute + update)
- Semaphore held until update succeeds (ensures capacity represents fully-handled tasks)
- Implemented register_task_def functionality:
- Automatically registers task definitions on worker startup
- Generates JSON Schema (draft-07) from Python type hints
- Supports dataclasses, Optional, List, Dict, Union types
- Creates schemas named {task_name}_input and {task_name}_output
- Works for both TaskRunner and AsyncTaskRunner
- Added task_def parameter for advanced configuration (retry, timeout, rate limits)
- Added overwrite_task_def and strict_schema flags:
overwrite_task_def(default: True) - Controls whether to overwrite existing task definitionsstrict_schema(default: False) - Controls additionalProperties in JSON schemas- Both configurable via environment variables
- Applies to both sync and async workers
- Added TaskUpdateFailure event:
- Published when task update fails after all retry attempts (4 retries with exponential backoff: 10s/20s/30s)
- Contains TaskResult for recovery/logging
- Enables external handling of critical update failures
- Event count: 7 total events (was 6)
- Fixed Optional[T] handling:
- Optional[T] parameters are NOT required in schema
- Optional[T] parameters are marked nullable
- Works correctly with nested dataclasses
- Added detailed polling loop with dynamic batch sizing examples
- Improved troubleshooting guidance
- Fixed class-based worker support in TaskHandler async detection
- Fixed task_def_template passing in scan_for_annotated_workers path
Version 4.0 (2025-11-28)
- AsyncTaskRunner: Pure async/await execution (zero thread overhead)
- Auto-detection: Automatic runner selection based on function signature
- Async HTTP: httpx.AsyncClient for non-blocking operations
- Process isolation: Clients created after fork
- Comprehensive event system documentation
- HTTP-based metrics serving
Issues: https://github.com/conductor-oss/conductor-python/issues