Conductor Python SDK Examples

February 17, 2026 ยท View on GitHub

Quick reference for example files demonstrating SDK features.

๐Ÿš€ Quick Start

# Install
pip install conductor-python httpx

# Configure
export CONDUCTOR_SERVER_URL="http://localhost:8080/api"

# Run end-to-end example
python examples/workers_e2e.py

๐Ÿ“ Examples by Category

Core Workers

FileDescriptionRun
workers_e2e.pyโญ Start here - sync + async workerspython examples/workers_e2e.py
worker_example.pyComprehensive patterns (None returns, TaskInProgress)python examples/worker_example.py
fastapi_worker_service.pyFastAPI exposing a workflow as an API (+ workers)uvicorn examples.fastapi_worker_service:app --port 8081 --workers 1
worker_configuration_example.pyHierarchical configuration (env vars)python examples/worker_configuration_example.py
task_context_example.pyTask context (logs, poll_count, task_id)python examples/task_context_example.py
task_workers.pyTask worker patterns with dataclassespython examples/task_workers.py
pythonic_usage.pyPythonic API patterns and decoratorspython examples/pythonic_usage.py

Key Concepts:

  • def โ†’ TaskRunner (ThreadPoolExecutor)
  • async def โ†’ AsyncTaskRunner (pure async/await, single event loop)
  • One process per worker (automatic selection)

Long-Running Tasks

from conductor.client.context.task_context import TaskInProgress
from typing import Union

@worker_task(task_definition_name='batch_job')
def process_batch(batch_id: str) -> Union[dict, TaskInProgress]:
    ctx = get_task_context()

    if ctx.get_poll_count() < 5:
        # More work - extend lease
        return TaskInProgress(callback_after_seconds=30)

    return {'status': 'completed'}

See: task_context_example.py, worker_example.py


Workflows

FileDescriptionRun
dynamic_workflow.pyCreate workflows programmaticallypython examples/dynamic_workflow.py
workflow_ops.pyStart, pause, resume, terminate workflowspython examples/workflow_ops.py
workflow_status_listner.pyWorkflow event listenerspython examples/workflow_status_listner.py
test_workflows.pyUnit testing workflowspython -m unittest examples.test_workflows

AI/LLM Workflows

See agentic_workflows/ for the full set of AI agent examples.

FileDescriptionRun
agentic_workflows/llm_chat.pyAutomated multi-turn LLM chatpython examples/agentic_workflows/llm_chat.py
agentic_workflows/llm_chat_human_in_loop.pyInteractive chat with WAIT task pausespython examples/agentic_workflows/llm_chat_human_in_loop.py
agentic_workflows/multiagent_chat.pyMulti-agent debate with moderator routingpython examples/agentic_workflows/multiagent_chat.py
agentic_workflows/function_calling_example.pyLLM picks Python functions to callpython examples/agentic_workflows/function_calling_example.py
agentic_workflows/mcp_weather_agent.pyAI agent with MCP tool callingpython examples/agentic_workflows/mcp_weather_agent.py "What's the weather?"
rag_workflow.pyRAG pipeline: markitdown, pgvector, search, answerpython examples/rag_workflow.py file.pdf "question"

Monitoring

FileDescriptionRun
metrics_example.pyPrometheus metrics (HTTP server on :8000)python examples/metrics_example.py
event_listener_examples.pyCustom event listeners (SLA, logging)python examples/event_listener_examples.py
task_listener_example.pyTask lifecycle listenerspython examples/task_listener_example.py

Access metrics: curl http://localhost:8000/metrics


Advanced

FileDescriptionNotes
task_configure.pyTask definitions (retry, timeout, rate limits)Programmatic task config
kitchensink.pyAll task types (HTTP, JS, JQ, Switch)Comprehensive
shell_worker.pyExecute shell commandsโš ๏ธ Educational only
untrusted_host.pySelf-signed SSL certificatesโš ๏ธ Dev/test only

๐ŸŽฏ API Journey Examples

Complete working examples demonstrating 100% API coverage for major SDK features.

Authorization & RBAC

FileDescriptionAPIs
authorization_journey.pyComplete RBAC implementation49 APIs

Scenario: E-commerce platform with departments, teams, and role-based access control.

Features:

  • User, group, and application management
  • Custom roles with fine-grained permissions
  • Resource access control and audit trails
  • Automatic cleanup (use --no-cleanup to keep resources)
python examples/authorization_journey.py

Schedule Management

FileDescriptionAPIs
schedule_journey.pyComplete scheduling system15 APIs

Scenario: E-commerce order processing with scheduled batch workflows.

Features:

  • Schedule CRUD operations
  • Cron expressions with timezone support
  • Pause/resume schedules
  • Execution history and monitoring
python examples/schedule_journey.py

Metadata Management

FileDescriptionAPIs
metadata_journey.pyWorkflow & task definitions21 APIs

Scenario: Online education platform with complex workflow orchestration.

Features:

  • Task and workflow definition management
  • Version control and tagging
  • Rate limiting and monitoring
  • Complex workflow patterns (SWITCH, FORK_JOIN, DECISION)
python examples/metadata_journey.py

Prompt Management

FileDescriptionAPIs
prompt_journey.pyAI/LLM prompt templates8 APIs

Scenario: AI-powered customer service with managed prompt templates.

Features:

  • Prompt template CRUD operations
  • Multi-language support
  • Testing with AI models
  • Version management and tagging
python examples/prompt_journey.py

RAG Pipeline Setup

Complete RAG (Retrieval Augmented Generation) pipeline example:

# 1. Install dependencies
pip install conductor-python "markitdown[pdf]"

# 2. Configure (requires Orkes Conductor with AI/LLM support)
#    - Vector DB integration named "postgres-prod" (pgvector)
#    - LLM provider named "openai" with a valid API key
export CONDUCTOR_SERVER_URL="http://localhost:7001/api"

# 3. Run RAG workflow
python examples/rag_workflow.py examples/goog-20251231.pdf "What were Google's total revenues?"

Pipeline: convert_to_markdown โ†’ LLM_INDEX_TEXT โ†’ WAIT โ†’ LLM_SEARCH_INDEX โ†’ LLM_CHAT_COMPLETE

Features:

  • Document conversion (PDF, Word, Excel โ†’ Markdown via markitdown)
  • Vector database ingestion into pgvector with OpenAI text-embedding-3-small embeddings
  • Semantic search with configurable result count
  • Context-aware answer generation with gpt-4o-mini

MCP Tool Integration Setup

MCP (Model Context Protocol) agent example:

# 1. Install MCP weather server
pip install mcp-weather-server

# 2. Start MCP server
python3 -m mcp_weather_server \
  --mode streamable-http \
  --host localhost \
  --port 3001 \
  --stateless

# 3. Run AI agent
export OPENAI_API_KEY="your-key"
export ANTHROPIC_API_KEY="your-key"
python examples/agentic_workflows/mcp_weather_agent.py "What's the weather in Tokyo?"

# Or simple mode (direct tool call):
python examples/agentic_workflows/mcp_weather_agent.py "Temperature in New York" --simple

Features:

  • MCP tool discovery
  • LLM-based planning (agent decides which tool to use)
  • Tool execution via HTTP/Streamable transport
  • Natural language response generation

๐ŸŽ“ Learning Path (60-Second Guide)

# 1. Basic workers (5 min)
python examples/workers_e2e.py

# 2. Long-running tasks (5 min)
python examples/task_context_example.py

# 3. Configuration (5 min)
python examples/worker_configuration_example.py

# 4. Workflows (10 min)
python examples/dynamic_workflow.py

# 5. AI/LLM Workflows (15 min)
python examples/agentic_workflows/llm_chat.py
python examples/rag_workflow.py examples/goog-20251231.pdf "What were Google's total revenues?"

# 6. Monitoring (5 min)
python examples/metrics_example.py
curl http://localhost:8000/metrics

๐Ÿ“ฆ Package Structure

examples/
โ”œโ”€โ”€ Core Workers
โ”‚   โ”œโ”€โ”€ workers_e2e.py                  # โญ Start here
โ”‚   โ”œโ”€โ”€ worker_example.py               # Comprehensive patterns
โ”‚   โ”œโ”€โ”€ worker_configuration_example.py # Env var configuration
โ”‚   โ”œโ”€โ”€ task_context_example.py         # Long-running tasks
โ”‚   โ”œโ”€โ”€ task_workers.py                 # Dataclass patterns
โ”‚   โ””โ”€โ”€ pythonic_usage.py               # Pythonic decorators
โ”‚
โ”œโ”€โ”€ Workflows
โ”‚   โ”œโ”€โ”€ dynamic_workflow.py             # Workflow creation
โ”‚   โ”œโ”€โ”€ workflow_ops.py                 # Workflow management
โ”‚   โ”œโ”€โ”€ workflow_status_listner.py      # Workflow events
โ”‚   โ””โ”€โ”€ test_workflows.py               # Unit tests
โ”‚
โ”œโ”€โ”€ AI/LLM Workflows
โ”‚   โ”œโ”€โ”€ rag_workflow.py                 # RAG pipeline (markitdown + pgvector)
โ”‚   โ””โ”€โ”€ agentic_workflows/             # Agentic AI examples
โ”‚       โ”œโ”€โ”€ llm_chat.py                # Multi-turn LLM chat
โ”‚       โ”œโ”€โ”€ llm_chat_human_in_loop.py  # Interactive chat with WAIT
โ”‚       โ”œโ”€โ”€ multiagent_chat.py         # Multi-agent debate
โ”‚       โ”œโ”€โ”€ function_calling_example.py # LLM function calling
โ”‚       โ””โ”€โ”€ mcp_weather_agent.py       # MCP tool calling agent
โ”‚
โ”œโ”€โ”€ Monitoring
โ”‚   โ”œโ”€โ”€ metrics_example.py              # Prometheus metrics
โ”‚   โ”œโ”€โ”€ event_listener_examples.py      # Custom listeners
โ”‚   โ””โ”€โ”€ task_listener_example.py        # Task events
โ”‚
โ”œโ”€โ”€ Advanced
โ”‚   โ”œโ”€โ”€ task_configure.py               # Task definitions
โ”‚   โ”œโ”€โ”€ kitchensink.py                  # All features
โ”‚   โ”œโ”€โ”€ shell_worker.py                 # Shell commands
โ”‚   โ””โ”€โ”€ untrusted_host.py               # SSL handling
โ”‚
โ”œโ”€โ”€ API Journeys
โ”‚   โ”œโ”€โ”€ authorization_journey.py        # โญ All 49 authorization APIs
โ”‚   โ”œโ”€โ”€ schedule_journey.py             # โญ All 15 schedule APIs
โ”‚   โ”œโ”€โ”€ metadata_journey.py             # โญ All 21 metadata APIs
โ”‚   โ””โ”€โ”€ prompt_journey.py               # โญ All 8 prompt APIs
โ”‚
โ”œโ”€โ”€ helloworld/                         # Simple examples
โ”‚   โ”œโ”€โ”€ greetings_worker.py
โ”‚   โ”œโ”€โ”€ greetings_workflow.py
โ”‚   โ””โ”€โ”€ helloworld.py
โ”‚
โ”œโ”€โ”€ user_example/                       # HTTP + dataclass
โ”‚   โ”œโ”€โ”€ models.py
โ”‚   โ””โ”€โ”€ user_workers.py
โ”‚
โ”œโ”€โ”€ worker_discovery/                   # Auto-discovery
โ”‚   โ”œโ”€โ”€ my_workers/
โ”‚   โ””โ”€โ”€ other_workers/
โ”‚
โ””โ”€โ”€ orkes/                             # Orkes-specific features
    โ”œโ”€โ”€ vector_db_helloworld.py        # Vector DB operations
    โ”œโ”€โ”€ agentic_workflow.py            # AI agent (AIOrchestrator)
    โ”œโ”€โ”€ http_poll.py
    โ”œโ”€โ”€ sync_updates.py
    โ””โ”€โ”€ wait_for_webhook.py

๐Ÿ”ง Configuration

Worker Architecture

Multiprocess - one process per worker with automatic runner selection:

# Sync worker โ†’ TaskRunner (ThreadPoolExecutor)
@worker_task(task_definition_name='cpu_task', thread_count=4)
def cpu_task(data: dict):
    return expensive_computation(data)

# Async worker โ†’ AsyncTaskRunner (event loop, 67% less memory)
@worker_task(task_definition_name='api_task', thread_count=50)
async def api_task(url: str):
    async with httpx.AsyncClient() as client:
        return await client.get(url)

Environment Variables

# Required
export CONDUCTOR_SERVER_URL="http://localhost:8080/api"

# Optional - Orkes Cloud
export CONDUCTOR_AUTH_KEY="your-key"
export CONDUCTOR_AUTH_SECRET="your-secret"

# Optional - Worker config
export conductor.worker.all.domain=production
export conductor.worker.all.poll_interval_millis=250
export conductor.worker.all.thread_count=20

๐Ÿ› Common Issues

Workers not polling?

  • Check task names match between workflow and @worker_task
  • Verify CONDUCTOR_SERVER_URL is correct
  • Check auth credentials

Async workers using threads?

  • Use async def (not def)
  • Check logs for "Created AsyncTaskRunner"

High memory?

  • Use async def for I/O tasks (lower memory)
  • Reduce worker count or thread_count

๐Ÿ“š Documentation

API References

Design Documents

Main Documentation


Repository: https://github.com/conductor-oss/conductor-python License: Apache 2.0