SupervisorAgent Module

April 3, 2026 ยท View on GitHub

Import: from selectools.orchestration import SupervisorAgent

Stability: beta

from selectools import Agent, AgentConfig, SupervisorAgent, tool
from selectools.providers.stubs import LocalProvider

@tool(description="No-op tool")
def noop(x: str) -> str:
    return x

provider = LocalProvider()

researcher = Agent(
    tools=[noop],
    provider=provider,
    config=AgentConfig(max_iterations=1),
    system_prompt="You are a researcher.",
)
writer = Agent(
    tools=[noop],
    provider=provider,
    config=AgentConfig(max_iterations=1),
    system_prompt="You are a writer.",
)

supervisor = SupervisorAgent(
    agents={"researcher": researcher, "writer": writer},
    provider=provider,
    strategy="round_robin",
    max_rounds=2,
)

result = supervisor.run("Write a short blog post about AI safety")
print(result.content[:200])
print(f"Steps: {result.steps}")

!!! tip "See Also" - Orchestration - Low-level AgentGraph engine - Patterns - PlanAndExecute, Reflective, Debate, TeamLead patterns


Added in: v0.18.0 File: src/selectools/orchestration/supervisor.py Classes: SupervisorAgent, SupervisorStrategy, ModelSplit

Table of Contents

  1. Overview
  2. Quick Start
  3. Strategies
  4. ModelSplit
  5. Delegation Constraints
  6. Budget & Cancellation
  7. Observers
  8. Streaming
  9. GraphResult
  10. Choosing a Strategy
  11. API Reference
  12. Examples

Overview

SupervisorAgent is a high-level multi-agent coordinator that wraps AgentGraph to provide four structured coordination strategies. Instead of building a graph manually with nodes and edges, you hand the supervisor a dict of named agents and pick a strategy. The supervisor handles planning, routing, completion detection, and replanning internally.

When to use SupervisorAgent vs raw AgentGraph

Use caseRecommendation
"Run these 3 agents in a planned sequence"SupervisorAgent (plan_and_execute)
"Let agents take turns collaborating"SupervisorAgent (round_robin)
"Route to the best agent each step"SupervisorAgent (dynamic)
"Fully autonomous multi-agent with replanning"SupervisorAgent (magentic)
Custom graph topology (branches, parallel fan-out, HITL)AgentGraph directly
Conditional routing with Python functionsAgentGraph directly
Subgraph compositionAgentGraph directly

The supervisor builds an AgentGraph internally for each run -- you get the same execution engine, checkpointing, budget propagation, and observer events without writing graph wiring code.


Quick Start

A minimal supervisor with two agents in under 20 lines:

from selectools import Agent, SupervisorAgent, SupervisorStrategy
from selectools.providers import OpenAIProvider

provider = OpenAIProvider()

researcher = Agent(tools=[...], provider=provider, system_prompt="You are a researcher.")
writer = Agent(tools=[...], provider=provider, system_prompt="You are a writer.")

supervisor = SupervisorAgent(
    agents={"researcher": researcher, "writer": writer},
    provider=provider,
    strategy="plan_and_execute",
)

result = supervisor.run("Write a blog post about LLM safety")
print(result.content)
print(f"Total tokens: {result.total_usage.total_tokens}")

The supervisor asks the LLM to generate a JSON plan ([{"agent": "researcher", "task": "..."}, {"agent": "writer", "task": "..."}]), then executes each step sequentially, passing the output of one agent as context to the next.


Strategies

plan_and_execute

The supervisor LLM generates a structured JSON plan, then executes each step sequentially. This is the default strategy.

Flow:

graph TD
    A["User Prompt"] --> B["Supervisor LLM generates plan"]
    B --> C["Step 1: researcher executes"]
    C --> D["Output stored in state"]
    D --> E["Step 2: writer receives output"]
    E --> F["GraphResult returned"]

Usage:

supervisor = SupervisorAgent(
    agents={
        "researcher": researcher_agent,
        "writer": writer_agent,
        "reviewer": reviewer_agent,
    },
    provider=provider,
    strategy="plan_and_execute",
)

result = supervisor.run("Write a reviewed article about quantum computing")
# The LLM decides the order and task for each agent

If the supervisor LLM fails to produce valid JSON, the fallback behavior executes agents in registration order with the original prompt.


round_robin

Agents take turns in registration order. After each full round (every agent has acted once), the supervisor checks whether the task looks complete. Runs up to max_rounds rounds.

Flow:

Round 1:
  agent_a acts --> agent_b acts --> agent_c acts
  Completion check: not done
Round 2:
  agent_a acts --> agent_b acts --> agent_c acts
  Completion check: done --> stop

Usage:

supervisor = SupervisorAgent(
    agents={
        "brainstormer": brainstorm_agent,
        "critic": critic_agent,
        "refiner": refine_agent,
    },
    provider=provider,
    strategy="round_robin",
    max_rounds=3,
)

result = supervisor.run("Design a REST API for a todo app")

Completion is detected by heuristic -- if the last agent output contains signals like "task complete", "done.", or "finished.", the supervisor stops early.


dynamic

An LLM router selects the best agent for each step based on the current task state and execution history. The router can respond with "DONE" to signal completion.

Flow:

Step 1:
  Router sees: "Task: analyze data, History: none"
  Router selects: "data_loader"
  data_loader executes

Step 2:
  Router sees: "Task: analyze data, History: data_loader loaded CSV"
  Router selects: "analyst"
  analyst executes

Step 3:
  Router sees: "Task: analyze data, History: analyst produced insights"
  Router responds: "DONE"

Usage:

supervisor = SupervisorAgent(
    agents={
        "data_loader": loader_agent,
        "analyst": analyst_agent,
        "visualizer": viz_agent,
    },
    provider=provider,
    strategy="dynamic",
    max_rounds=8,
)

result = supervisor.run("Analyze sales data and create a summary")

If the router hallucinates an agent name that does not exist, the supervisor falls back to the first registered agent.


magentic

The most autonomous strategy, based on the Magentic-One pattern. The supervisor maintains two ledgers:

  1. Task Ledger -- known facts, working assumptions, and the current plan
  2. Progress Ledger -- whether the task is progressing, whether it is complete, and which agent should act next

After max_stalls consecutive unproductive steps, the supervisor replans from scratch with a fresh approach.

Flow:

Step 1:
  Supervisor produces ledger:
    task_ledger: {facts: [...], plan: ["step 1", "step 2"]}
    progress_ledger: {is_complete: false, is_progressing: true, next_agent: "researcher"}
  researcher executes

Step 2:
  Supervisor updates ledger:
    progress_ledger: {is_complete: false, is_progressing: false, next_agent: "researcher"}
  Stall detected (1/2)

Step 3:
  Supervisor updates ledger:
    progress_ledger: {is_complete: false, is_progressing: false, next_agent: "researcher"}
  Stall detected (2/2) --> max_stalls reached --> REPLAN
  on_supervisor_replan event fires
  New plan generated from scratch

Step 4:
  Supervisor updates ledger with fresh plan:
    progress_ledger: {is_complete: false, is_progressing: true, next_agent: "writer"}
  writer executes

Step 5:
  progress_ledger: {is_complete: true, next_agent: "DONE"}
  --> stop

Usage:

supervisor = SupervisorAgent(
    agents={
        "researcher": researcher_agent,
        "coder": coder_agent,
        "reviewer": reviewer_agent,
    },
    provider=provider,
    strategy="magentic",
    max_rounds=10,
    max_stalls=2,  # replan after 2 consecutive unproductive steps
)

result = supervisor.run("Build a Python CLI tool that fetches weather data")
print(f"Stalls detected: {result.stalls}")

ModelSplit

Use separate models for planning and execution to reduce costs by 70-90%. The expensive model generates the plan; cheap models execute the steps.

from selectools import SupervisorAgent, ModelSplit

supervisor = SupervisorAgent(
    agents={"researcher": researcher, "writer": writer},
    provider=provider,
    strategy="plan_and_execute",
    model_split=ModelSplit(
        planner_model="gpt-4o",        # expensive: generates the plan
        executor_model="gpt-4o-mini",   # cheap: executes each step
    ),
)

result = supervisor.run("Write a technical report")
print(f"Total cost: ${result.total_usage.cost_usd:.4f}")

ModelSplit is a dataclass with two fields:

FieldTypeDescription
planner_modelstrModel used for supervisor planning and routing calls
executor_modelstrModel used by agent nodes during execution

When model_split is None (the default), the supervisor uses a default lightweight model (gpt-4o-mini if available) for planning calls. The agent nodes use whatever model their individual Agent instances are configured with.


Delegation Constraints

The delegation_constraints parameter prevents infinite delegation ping-pong between agents. It maps each agent name to an explicit allow-list of agents it can delegate to.

supervisor = SupervisorAgent(
    agents={
        "planner": planner_agent,
        "worker_a": worker_a_agent,
        "worker_b": worker_b_agent,
    },
    provider=provider,
    strategy="dynamic",
    delegation_constraints={
        # worker_a can only hand off to planner (not to worker_b)
        "worker_a": ["planner"],
        # worker_b can only hand off to planner
        "worker_b": ["planner"],
        # planner can delegate to either worker
        "planner": ["worker_a", "worker_b"],
    },
)

Without constraints, dynamic and magentic strategies could produce cycles where two agents keep handing work back and forth. Constraints enforce a directed hierarchy.


Budget & Cancellation

SupervisorAgent propagates budget limits and cancellation tokens to the underlying AgentGraph.

Token and cost budgets

supervisor = SupervisorAgent(
    agents={"researcher": researcher, "writer": writer},
    provider=provider,
    strategy="plan_and_execute",
    max_total_tokens=100_000,   # graph-level token budget
    max_cost_usd=0.50,          # graph-level cost cap
    max_rounds=10,              # iteration limit
)

result = supervisor.run("Write a detailed analysis")
print(f"Tokens used: {result.total_usage.total_tokens}")

When a budget limit is hit, the graph stops gracefully and returns a partial GraphResult with whatever work was completed.

Cancellation

import asyncio
from selectools import CancellationToken

token = CancellationToken()

supervisor = SupervisorAgent(
    agents={"worker": worker_agent},
    provider=provider,
    strategy="round_robin",
    cancellation_token=token,
)

async def run_with_timeout():
    task = asyncio.create_task(supervisor.arun("Long-running task"))
    await asyncio.sleep(5)
    token.cancel()  # cooperative cancellation
    result = await task
    print(f"Steps completed: {result.steps}")

asyncio.run(run_with_timeout())

The cancellation token is checked at the start of each round and before each agent call. Cancellation is cooperative -- the current agent call completes, but no new calls are started.


Observers

Attach AgentObserver instances to receive events from the supervisor and its underlying graph.

on_supervisor_replan

The on_supervisor_replan event fires when the magentic strategy replans from scratch after hitting max_stalls:

from selectools import AgentObserver

class SupervisorWatcher(AgentObserver):
    def on_supervisor_replan(self, run_id: str, stall_count: int, new_plan: str):
        print(f"[{run_id}] Replanned after {stall_count} stalls")
        print(f"  New plan: {new_plan[:200]}")

supervisor = SupervisorAgent(
    agents={"researcher": researcher, "coder": coder},
    provider=provider,
    strategy="magentic",
    max_stalls=2,
    observers=[SupervisorWatcher()],
)

The new_plan parameter is the raw JSON string returned by the supervisor LLM during replanning.

Graph-level observer events

Because the supervisor wraps AgentGraph, all standard graph observer events also fire: on_graph_start, on_node_start, on_node_end, on_stall_detected, on_loop_detected, and on_budget_exceeded.


Streaming

Use astream() to receive graph events from the supervisor execution as they happen:

import asyncio
from selectools import SupervisorAgent

supervisor = SupervisorAgent(
    agents={"researcher": researcher, "writer": writer},
    provider=provider,
    strategy="plan_and_execute",
)

async def stream_supervisor():
    async for event in supervisor.astream("Write a blog post"):
        print(f"Event: {event.type} | Node: {event.node_name}")
        if event.data:
            print(f"  Data: {str(event.data)[:100]}")

asyncio.run(stream_supervisor())

The astream() method builds a round-robin graph internally and yields GraphEvent objects with type (a GraphEventType enum) and node_name fields.


GraphResult

All supervisor methods return a GraphResult dataclass:

FieldTypeDescription
contentstrLast node's output (the final result)
stateGraphStateFinal shared state after all nodes executed
node_resultsDict[str, List[AgentResult]]Per-agent result lists keyed by agent name
traceAgentTraceGraph-level execution trace
total_usageUsageStatsAggregated token and cost stats across all agents
interruptedboolTrue if paused for human-in-the-loop
interrupt_idOptional[str]Checkpoint ID for graph.resume()
stepsintTotal graph-level iterations executed
stallsintNumber of stall events detected
loops_detectedintNumber of hard loop events detected
result = supervisor.run("Analyze this dataset")

# Final content
print(result.content)

# Per-agent results
for agent_name, results in result.node_results.items():
    for r in results:
        print(f"  {agent_name}: {r.content[:80]}")

# Cost tracking
print(f"Total tokens: {result.total_usage.total_tokens}")
print(f"Total cost: ${result.total_usage.cost_usd:.4f}")

# Execution metadata
print(f"Steps: {result.steps}, Stalls: {result.stalls}")

Choosing a Strategy

Criteriaplan_and_executeround_robindynamicmagentic
AutonomyLowLowMediumHigh
CostLowest (with ModelSplit)MediumMediumHighest
PredictabilityHigh (fixed plan)High (fixed order)MediumLow
Handles stallsNoNoNoYes (auto-replan)
Best forKnown workflowsCollaborative refinementHeterogeneous agentsOpen-ended tasks
LLM calls overhead1 (plan)01 per step (routing)1 per step (ledger)

Rules of thumb:

  • Start with plan_and_execute -- it is the simplest and cheapest, especially with ModelSplit.
  • Use round_robin when every agent should contribute each round (brainstorm/critique/refine loops).
  • Use dynamic when you have specialized agents and the optimal sequence depends on intermediate results.
  • Use magentic for complex, open-ended tasks where the supervisor needs to detect dead ends and try a different approach.

API Reference

SupervisorAgent.__init__() Parameters

ParameterTypeDefaultDescription
agentsDict[str, Agent](required)Named agent instances. Keys are the names used in plans and routing.
providerProvider(required)LLM provider for supervisor planning/routing calls.
strategySupervisorStrategy"plan_and_execute"Coordination strategy. Accepts enum or string.
max_roundsint10Maximum coordination rounds before stopping.
max_stallsint2Magentic only: consecutive unproductive steps before replanning.
model_splitOptional[ModelSplit]NoneSeparate models for planning vs execution.
delegation_constraintsOptional[Dict[str, List[str]]]NonePer-agent allow-lists to prevent delegation loops.
cancellation_tokenOptional[CancellationToken]NoneToken for cooperative cancellation.
max_total_tokensOptional[int]NoneGraph-level cumulative token budget.
max_cost_usdOptional[float]NoneGraph-level cumulative cost cap in USD.
observersOptional[List[AgentObserver]]NoneObserver instances for events.

Methods

MethodSignatureDescription
run()run(prompt: str) -> GraphResultSynchronous execution.
arun()async arun(prompt: str) -> GraphResultAsynchronous execution.
astream()async astream(prompt: str) -> AsyncGenerator[GraphEvent, None]Stream graph events.

Examples

See examples/60_supervisor_agent.py for a runnable demo of all four strategies using mock agents (no API keys needed).


See Also


Next Steps: Learn about building custom graphs in the AgentGraph Module.


#ScriptDescription
6060_supervisor_agent.pyAll four supervisor strategies with mock agents