Agent Module

June 11, 2026 · View on GitHub

Import: from selectools import Agent, AgentConfig Stability: stable Since: v0.13.0

from selectools import Agent, AgentConfig, tool
from selectools.providers.stubs import LocalProvider

@tool()
def search(query: str) -> str:
    """Search for information."""
    return f"Results for: {query}"

agent = Agent(
    tools=[search],
    provider=LocalProvider(),
    config=AgentConfig(model="gpt-4o"),
)
result = agent.run("Find Python tutorials")
print(result.content)   # Agent response
print(result.usage)     # Token counts + cost

!!! tip "See Also" - Tools -- @tool decorator and tool system - Streaming -- token-level async streaming - Memory -- conversation history management - Providers -- OpenAI, Anthropic, Gemini, Ollama adapters


File: src/selectools/agent/core.py Classes: Agent, AgentConfig

Table of Contents

  1. Overview
  2. Agent Loop Lifecycle
  3. Tool Selection Process
  4. Configuration
  5. Retry and Error Handling
  6. Sync vs Async Execution
  7. Hook System (Removed)
  8. AgentObserver Protocol
  9. Memory Integration
  10. Streaming
  11. Parallel Tool Execution
  12. Response Caching
  13. Structured Output
  14. Execution Traces
  15. Reasoning Visibility
  16. Provider Fallback
  17. Batch Processing
  18. Tool Policy & Human-in-the-Loop
  19. Terminal Actions
  20. Implementation Details

Overview

The Agent class is the central orchestrator of the selectools framework. It manages the iterative loop of sending messages to an LLM, parsing responses for tool calls, executing those tools, and feeding results back until the task is complete.

Key Responsibilities

  1. Conversation Management: Maintain message history with optional memory
  2. Provider Communication: Call LLM APIs through provider abstraction (with fallback)
  3. Tool Orchestration: Detect, validate, enforce policies, and execute tool calls
  4. Structured Output: Validate LLM responses against Pydantic/JSON Schema with auto-retry
  5. Execution Traces: Record structured timeline of every step (AgentTrace)
  6. Reasoning Visibility: Extract and surface why the agent chose a tool
  7. Error Recovery: Handle failures with retries and backoff
  8. Observability: Notify lifecycle observers for monitoring
  9. Cost Tracking: Monitor token usage and costs
  10. Analytics: Track tool usage patterns (optional)
  11. Parallel Execution: Execute independent tool calls concurrently
  12. Batch Processing: Process multiple prompts concurrently
  13. Streaming: Token-level streaming with native tool support
  14. Response Caching: Avoid redundant LLM calls via pluggable cache layer
  15. Tool Policy & HITL: Declarative allow/review/deny rules with human approval

Properties & Convenience Methods

Property / MethodDescription
agent.nameReturns config.name (default: "agent"). Useful for multi-agent identification.
agent(messages, **kw)Shorthand for agent.run(messages, **kw) via __call__.
agent.ask(prompt)Shorthand for run() with a single string prompt.
agent.aask(prompt)Async version of ask().
# Named agents for multi-agent systems
researcher = Agent(tools=[search], config=AgentConfig(name="researcher"))
print(researcher.name)  # "researcher"

# Call the agent directly
result = researcher("Find info about Python")  # same as researcher.run(...)

Core Dependencies

from .types import Message, Role
from .tools import Tool
from .prompt import PromptBuilder
from .parser import ToolCallParser
from .structured import parse_and_validate, build_schema_instruction
from .trace import AgentTrace, TraceStep
from .policy import ToolPolicy, PolicyDecision
from .providers.base import Provider
from .providers.fallback import FallbackProvider
from .memory import ConversationMemory  # Optional
from .usage import AgentUsage
from .analytics import AgentAnalytics  # Optional

Agent Loop Lifecycle

State Machine Diagram

flowchart TD
    START([START]) --> LOAD["Load Message History\n(from memory if set)"]
    LOAD --> LOOP["Iteration Loop\n(max_iterations times)"]
    LOOP --> HOOK["on_iteration_start hook"]
    LOOP --> BUILD["Build Prompt with Tools"]
    BUILD --> LLM["Call LLM Provider\n(with retries)"]
    LLM --> PARSE["Parse Response\n(ToolCallParser)"]
    PARSE --> TC{Tool call?}
    TC -- No --> RETURN["Return Final Response"]
    TC -- Yes --> VALIDATE{"Valid tool\n& params?"}
    VALIDATE -- Invalid --> ERR["Append Error Message"]
    ERR --> LOOP
    VALIDATE -- Valid --> EXEC["Execute Tool\n(with timeout)"]
    EXEC --> APPEND["Append Result to History"]
    APPEND --> LOOP

Execution Flow

1. Initialization

agent = Agent(
    tools=[search_tool, calculator_tool],
    provider=OpenAIProvider(),
    config=AgentConfig(max_iterations=6),
    memory=ConversationMemory(max_messages=20)
)

The agent initializes with:

  • Tool registry (_tools_by_name dict for O(1) lookup)
  • System prompt (built from tool schemas)
  • Empty history
  • Usage tracker
  • Optional analytics tracker

2. Run Method Entry

response = agent.run([
    Message(role=Role.USER, content="Search for Python and calculate 2+2")
])

Steps:

  1. Call on_agent_start hook
  2. Load history from memory (if available)
  3. Append new messages to history
  4. Enter iteration loop

3. Iteration Loop

iteration = 0
while iteration < self.config.max_iterations:
    iteration += 1

    # 1. Call hook
    self._call_hook("on_iteration_start", iteration, self._history)

    # 2. Call provider
    response_text = self._call_provider()

    # 3. Parse response
    parse_result = self.parser.parse(response_text)

    # 4. Check for tool call
    if not parse_result.tool_call:
        # No tool call - we're done!
        return Message(role=Role.ASSISTANT, content=response_text)

    # 5. Execute tool
    tool = self._tools_by_name.get(tool_name)
    result = self._execute_tool_with_timeout(tool, parameters)

    # 6. Append to history
    self._append_assistant_and_tool(response_text, result, tool_name)

    # 7. Loop continues...

4. Tool Execution

def _execute_tool_with_timeout(self, tool, parameters, chunk_callback):
    if not self.config.tool_timeout_seconds:
        return tool.execute(parameters, chunk_callback)

    # Execute with timeout
    with ThreadPoolExecutor(max_workers=1) as executor:
        future = executor.submit(tool.execute, parameters, chunk_callback)
        try:
            return future.result(timeout=self.config.tool_timeout_seconds)
        except TimeoutError:
            future.cancel()
            raise TimeoutError(f"Tool '{tool.name}' timed out")

Features:

  • Optional timeout enforcement
  • Chunk callback for streaming tools
  • Exception wrapping for better errors
  • Analytics tracking (if enabled)

5. Loop Termination

The loop exits when:

  1. No tool call detected → Return LLM response as final answer
  2. Max iterations reached → Return timeout message
  3. Exception raised → Propagate to caller

Tool Selection Process

How the Agent Decides Which Tool to Use

The agent doesn't directly "decide" - it relies on the LLM to make the decision based on the system prompt and conversation context.

graph TD
    A["System Prompt (PromptBuilder)\nTool call contract + tool JSON schemas"] --> C["LLM\nGPT-4o / Claude / Gemini / etc."]
    B["Conversation History\nUSER: Search for Python tutorials"] --> C
    C --> D["LLM Response\nTOOL_CALL: search\nparams: query = 'Python tutorials'"]

Validation Flow

# 1. Parse the tool call
parse_result = self.parser.parse(response_text)

if parse_result.tool_call:
    tool_name = parse_result.tool_call.tool_name
    parameters = parse_result.tool_call.parameters

    # 2. Check if tool exists
    tool = self._tools_by_name.get(tool_name)
    if not tool:
        error_msg = f"Unknown tool '{tool_name}'. Available: {list(self._tools_by_name.keys())}"
        # Append error and continue loop
        self._append_assistant_and_tool(response_text, error_msg, tool_name)
        continue

    # 3. Validate parameters
    try:
        tool.validate(parameters)
    except ToolValidationError as e:
        # Append validation error and continue
        self._append_assistant_and_tool(response_text, str(e), tool_name)
        continue

    # 4. Execute tool
    result = tool.execute(parameters)

Error Handling Strategy:

The agent doesn't fail on invalid tool calls. Instead:

  1. Append error message to conversation
  2. Let LLM see the error
  3. LLM can retry with corrections or choose a different approach

This creates a self-correcting loop.


Configuration

AgentConfig Dataclass

@dataclass
class AgentConfig:
    # Model selection
    model: str = "gpt-5-mini"
    temperature: float = 0.0
    max_tokens: int = 1000

    # Loop control
    max_iterations: int = 6

    # Reliability
    max_retries: int = 2
    retry_backoff_seconds: float = 1.0
    rate_limit_cooldown_seconds: float = 5.0
    request_timeout: Optional[float] = 30.0
    tool_timeout_seconds: Optional[float] = None

    # Cost management
    cost_warning_threshold: Optional[float] = None

    # Observability
    verbose: bool = False
    enable_analytics: bool = False
    observers: List[AgentObserver] = field(default_factory=list)

    # Execution mode
    routing_only: bool = False
    parallel_tool_execution: bool = True

    # Streaming
    stream: bool = False

    # Caching
    cache: Optional[Cache] = None  # InMemoryCache, RedisCache, or custom

    # Tool Safety
    tool_policy: Optional[ToolPolicy] = None  # allow/review/deny rules
    confirm_action: Optional[ConfirmAction] = None  # Human-in-the-loop callback
    approval_timeout: float = 60.0  # Seconds before auto-deny

    # Sessions & Persistence (v0.16.0)
    session_store: Optional[SessionStore] = None  # Auto-save/load conversation state
    session_id: Optional[str] = None  # Unique session identifier

    # Summarize-on-Trim (v0.16.0)
    summarize_on_trim: bool = False  # Summarize trimmed messages before dropping
    summarize_provider: Optional[Provider] = None  # Provider for summarization (defaults to agent's)
    summarize_model: Optional[str] = None  # Model for summarization (use a cheap model)
    summarize_max_tokens: int = 150  # Max tokens for the summary response

    # Advanced Memory (v0.16.0)
    entity_memory: Optional[EntityMemory] = None  # LLM-based entity extraction
    knowledge_graph: Optional[KnowledgeGraphMemory] = None  # Relationship triple extraction
    knowledge_memory: Optional[KnowledgeMemory] = None  # Cross-session durable memory

Configuration Patterns

Production Config

config = AgentConfig(
    model="gpt-4o-mini",  # Cost-effective
    temperature=0.0,      # Deterministic
    max_tokens=2000,
    max_iterations=10,
    max_retries=3,
    retry_backoff_seconds=2.0,
    rate_limit_cooldown_seconds=10.0,
    request_timeout=60.0,
    tool_timeout_seconds=30.0,
    cost_warning_threshold=0.50,  # Alert at \$0.50
    verbose=False,
    enable_analytics=True
)

Production Config with Caching

from selectools import InMemoryCache

cache = InMemoryCache(max_size=2000, default_ttl=600)
config = AgentConfig(
    model="gpt-4o-mini",
    temperature=0.0,
    cache=cache,               # Enable response caching
    max_retries=3,
    cost_warning_threshold=0.50,
)

Development Config

config = AgentConfig(
    model="gpt-4o",
    verbose=True,         # See what's happening
    max_iterations=3,     # Fast feedback
    stream=True,          # See responses live
)

Budget-Conscious Config

config = AgentConfig(
    model="gpt-4o-mini",
    max_tokens=500,
    max_iterations=3,
    cost_warning_threshold=0.01,
)

Retry and Error Handling

Retry Logic Flow

flowchart TD
    A["Provider Call"] --> B["Attempt N"]
    B --> C{Success?}
    C -- Yes --> D["Return response"]
    C -- No --> E{Rate limit error?}
    E -- Yes --> F["Sleep(rate_limit_cooldown * attempt)"]
    E -- No --> G["Sleep(retry_backoff * attempt)"]
    F --> G
    G --> H{Retries remaining?}
    H -- Yes --> B
    H -- No --> I["Return error message"]

Implementation

def _call_provider(self, stream_handler=None):
    attempts = 0
    last_error = None

    while attempts <= self.config.max_retries:
        attempts += 1

        try:
            # Call provider
            response_text, usage_stats = self.provider.complete(
                model=self.config.model,
                system_prompt=self._system_prompt,
                messages=self._history,
                temperature=self.config.temperature,
                max_tokens=self.config.max_tokens,
                timeout=self.config.request_timeout,
            )

            # Track usage
            self.usage.add_usage(usage_stats)

            return response_text

        except ProviderError as exc:
            last_error = str(exc)

            if attempts > self.config.max_retries:
                break

            # Rate limit handling
            if self._is_rate_limit_error(last_error):
                time.sleep(self.config.rate_limit_cooldown_seconds * attempts)

            # Standard backoff
            if self.config.retry_backoff_seconds:
                time.sleep(self.config.retry_backoff_seconds * attempts)

    return f"Provider error: {last_error or 'unknown error'}"

Rate Limit Detection

def _is_rate_limit_error(self, message: str) -> bool:
    lowered = message.lower()
    return "rate limit" in lowered or "429" in lowered

Tool Execution Errors

Tool errors don't cause the entire agent to fail:

try:
    result = self._execute_tool_with_timeout(tool, parameters)
    self._call_hook("on_tool_end", tool.name, result, duration)

except Exception as exc:
    self._call_hook("on_tool_error", tool.name, exc, parameters)

    error_message = f"Error executing tool '{tool.name}': {exc}"
    self._append_assistant_and_tool(response_text, error_message, tool.name)

    # Continue to next iteration - let LLM handle the error
    continue

Sync vs Async Execution

All three execution methods share the same parameters and feature set (as of v0.16.3):

ParameterTypeDefaultDescription
messagesstr | List[Message]requiredUser prompt or message list
stream_handlerCallable[[str], None]NoneCallback for streaming chunks (run/arun only)
response_formatResponseFormatNonePydantic model or JSON Schema for structured output
parent_run_idstrNoneLinks trace to a parent agent's run for nested orchestration

Sync Execution (run())

response = agent.run([Message(role=Role.USER, content="Hello")])

When to use:

  • Simple scripts
  • Jupyter notebooks
  • Single-threaded applications
  • Blocking I/O is acceptable

Async Execution (arun())

response = await agent.arun([Message(role=Role.USER, content="Hello")])

When to use:

  • Web frameworks (FastAPI, aiohttp)
  • Concurrent operations
  • High-performance applications
  • Multiple agents in parallel

Implementation Differences

Sync Path

def run(self, messages, stream_handler=None):
    # Provider call (blocking)
    response_text, usage_stats = self.provider.complete(...)

    # Tool execution (blocking)
    result = tool.execute(parameters)

Async Path

async def arun(self, messages, stream_handler=None):
    # Provider call (non-blocking)
    if hasattr(self.provider, "acomplete"):
        response_text, usage_stats = await self.provider.acomplete(...)
    else:
        # Fallback: run sync in executor
        loop = asyncio.get_event_loop()
        with ThreadPoolExecutor() as executor:
            response_text, usage_stats = await loop.run_in_executor(
                executor, lambda: self.provider.complete(...)
            )

    # Tool execution (non-blocking)
    result = await tool.aexecute(parameters)

Async Tool Support

Tools can be async:

@tool(description="Fetch data from API")
async def fetch_data(url: str) -> str:
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.text()

The agent automatically detects and handles async tools via tool.is_async flag.


Hook System (Removed)

Removed in v1.0AgentConfig.hooks (deprecated since v0.16.5) has been removed. Passing hooks= to AgentConfig now raises TypeError. Use AgentObserver or AsyncAgentObserver instead. See docs/MIGRATION_1.0.md and docs/decisions/002-observer-replaces-hooks.md.

Migration is mechanical — each hook key maps to an observer method with richer arguments (run_id, and call_id for tool events):

Legacy hook keyObserver method
on_agent_starton_run_start(run_id, messages, system_prompt)
on_agent_endon_run_end(run_id, result)
on_iteration_starton_iteration_start(run_id, iteration, messages)
on_iteration_endon_iteration_end(run_id, iteration, response)
on_tool_starton_tool_start(run_id, call_id, tool_name, tool_args)
on_tool_chunkon_tool_chunk(run_id, call_id, tool_name, chunk)
on_tool_endon_tool_end(run_id, call_id, tool_name, result, duration_ms)
on_tool_erroron_tool_error(run_id, call_id, tool_name, error, tool_args, duration_ms)
on_llm_starton_llm_start(run_id, messages, model, system_prompt)
on_llm_endon_llm_end(run_id, response, usage)
on_erroron_error(run_id, error, context)

Note: hook duration was in seconds; observer duration_ms is in milliseconds.

# Before (removed)
config = AgentConfig(hooks={"on_tool_start": lambda name, args: log(name)})

# After
from selectools import AgentObserver

class ToolLogger(AgentObserver):
    def on_tool_start(self, run_id, call_id, tool_name, tool_args):
        log(tool_name)

config = AgentConfig(observers=[ToolLogger()])

Design Decision (unchanged): Observer errors never break the agent. They're for observability, not control flow.


AgentObserver Protocol

File: src/selectools/observer.py Classes: AgentObserver, LoggingObserver

The AgentObserver protocol is the class-based notification system for structured observability integrations (Langfuse, OpenTelemetry, Datadog). Every callback receives a run_id for cross-request correlation, and tool callbacks also receive a call_id for matching parallel tool start/end pairs.

Quick Start

from selectools import Agent, AgentConfig, AgentObserver, LoggingObserver

class MyObserver(AgentObserver):
    def on_llm_start(self, run_id, messages, model, system_prompt):
        print(f"[{run_id}] LLM call to {model}")

    def on_tool_end(self, run_id, call_id, tool_name, result, duration_ms):
        print(f"[{run_id}] {tool_name} finished in {duration_ms:.1f}ms")

agent = Agent(
    tools=[...], provider=provider,
    config=AgentConfig(observers=[MyObserver(), LoggingObserver()]),
)

All 31 Lifecycle Events

EventScopeParameters (after run_id)When
on_run_startRunmessages, system_promptStart of run()/arun()/astream()
on_run_endRunresult (AgentResult)Agent produces final result
on_errorRunerror, contextUnrecoverable error
on_llm_startLLMmessages, model, system_promptBefore each provider call
on_llm_endLLMresponse, usageAfter each provider call
on_cache_hitLLMmodel, responseResponse served from cache
on_usageLLMusage (UsageStats)Per-call token/cost stats
on_llm_retryLLMattempt, max_retries, error, backoff_secondsLLM call about to be retried
on_tool_startToolcall_id, tool_name, tool_argsBefore tool execution
on_tool_endToolcall_id, tool_name, result, duration_msAfter successful tool execution
on_tool_errorToolcall_id, tool_name, error, tool_args, duration_msTool raised an exception
on_tool_chunkToolcall_id, tool_name, chunkStreaming tool emits a chunk
on_iteration_startIterationiteration, messagesStart of agent loop iteration
on_iteration_endIterationiteration, responseEnd of agent loop iteration
on_batch_startBatchbatch_id*, prompts_countBefore batch()/abatch()
on_batch_endBatchbatch_id*, results_count, errors_count, total_duration_msAfter all batch items complete
on_policy_decisionPolicytool_name, decision, reason, tool_argsAfter tool policy evaluation
on_structured_validateStructuredsuccess, attempt, errorAfter structured output validation
on_provider_fallbackFallbackfailed_provider, next_provider, errorFallbackProvider switches provider
on_memory_trimMemorymessages_removed, messages_remaining, reasonMemory enforces limits
on_session_loadSessionsession_id, message_countSession loaded from store (v0.16.0)
on_session_saveSessionsession_id, message_countSession saved to store (v0.16.0)
on_memory_summarizeMemorysummary, messages_summarizedTrimmed messages summarized (v0.16.0)
on_entity_extractionMemoryentities, turn_countEntities extracted from turn (v0.16.0)

*on_batch_start/on_batch_end use batch_id instead of run_id.

Built-in LoggingObserver

Emits structured JSON events to Python's logging module:

import logging
logging.basicConfig(level=logging.INFO)

agent = Agent(
    tools=[...], provider=provider,
    config=AgentConfig(observers=[LoggingObserver()]),
)

Output:

{"event": "run_start", "run_id": "a3f2...", "model": "gpt-4o-mini", "timestamp": 1708099200.0}
{"event": "llm_end", "run_id": "a3f2...", "tokens": 150, "duration_ms": 312.5}
{"event": "tool_end", "run_id": "a3f2...", "tool": "search", "duration_ms": 45.2}

Why Observers (vs the removed hooks dict)

AspectHooks (dict, removed in v1.0)AgentObserver
CorrelationManual (closures, thread-local)Built-in run_id + call_id
Multiple consumersOne callback per eventMultiple observers
Event coverage8 events31 events (including batch, fallback, retry, memory, budget, cancellation, model switch)
Type safetyDict keys are stringsProtocol methods with signatures
Use caseQuick debugging, simple loggingProduction observability (Langfuse, OTel, Datadog)

AsyncAgentObserver

For async-native applications (FastAPI, aiohttp, async SQLAlchemy), AsyncAgentObserver provides 28 async a_on_* methods that mirror the sync observer:

from selectools import AsyncAgentObserver

class DBObserver(AsyncAgentObserver):
    blocking = True  # await inline — must complete before next tool

    async def a_on_tool_end(self, run_id, call_id, tool_name, result, duration_ms):
        await db.execute("INSERT INTO events ...")

class WebhookObserver(AsyncAgentObserver):
    blocking = False  # fire-and-forget via asyncio.ensure_future

    async def a_on_run_end(self, run_id, result):
        await httpx.post("https://hooks.example.com/...", json={...})

agent = Agent(
    tools=[...],
    provider=provider,
    config=AgentConfig(observers=[DBObserver(), WebhookObserver()]),
)
  • blocking=True: Awaited inline — the agent loop waits for completion. Use for DB writes, rate limiting, result enrichment.
  • blocking=False (default): Dispatched via asyncio.ensure_future(). Use for webhooks, logging, audit trails.

Async observers are called in arun() and astream() after each sync observer notification. In sync run(), only sync observers fire.

Trace Metadata & Nested Agents

config = AgentConfig(
    parent_run_id="outer-agent-run-id",
    trace_metadata={"user_id": "u123", "environment": "production"},
    observers=[MyObserver()],
)
result = agent.run("classify this")
print(result.trace.parent_run_id)   # "outer-agent-run-id"
print(result.trace.metadata)         # {"user_id": "u123", "environment": "production"}

# Export as OpenTelemetry spans
spans = result.trace.to_otel_spans()

Memory Integration

Basic Memory

memory = ConversationMemory(max_messages=20)
agent = Agent(tools=[...], provider=provider, memory=memory)

# First turn
response1 = agent.run([Message(role=Role.USER, content="My name is Alice")])

# Second turn - history is preserved
response2 = agent.run([Message(role=Role.USER, content="What's my name?")])
# LLM can reference "Alice" from previous turn

Flow:

graph TD
    A["run() called"] --> B["memory.get_history()"]
    B --> C["Append new messages"]
    C --> D["memory.add_many(new_messages)"]
    D --> E["Execute loop"]
    E --> F["memory.add(final_response)"]
    F --> G["Return"]

Without Memory

agent = Agent(tools=[...], provider=provider)  # No memory

# Each call is independent
response = agent.run([Message(role=Role.USER, content="Hello")])

History is local to each run() call.

Memory Limits

memory = ConversationMemory(
    max_messages=20,      # Keep last 20 messages
    max_tokens=4000       # Or limit by token count
)

When limits are exceeded, oldest messages are dropped (sliding window).

Persistent Sessions

Auto-save and auto-load conversation state across process restarts using session_store and session_id:

from selectools.sessions import JsonFileSessionStore

store = JsonFileSessionStore(directory="./sessions")
agent = Agent(
    tools=[...], provider=provider,
    config=AgentConfig(session_store=store, session_id="user-123"),
)

# First run — auto-loads existing session (if any), auto-saves after
result = agent.run([Message(role=Role.USER, content="My name is Alice")])

# Later (even after restart) — session is restored automatically
result = agent.run([Message(role=Role.USER, content="What's my name?")])
# Agent knows: "Alice"

Four backends are available: JsonFileSessionStore, SQLiteSessionStore, RedisSessionStore, SupabaseSessionStore. All support namespace isolation; the file/SQLite/Redis backends also support TTL-based expiry.

See Sessions Module for backend details and TTL configuration.

Summarize-on-Trim

When messages are trimmed by the sliding window, optionally generate a summary of the dropped messages and inject it as system context:

agent = Agent(
    tools=[...], provider=provider,
    memory=ConversationMemory(max_messages=30),
    config=AgentConfig(
        summarize_on_trim=True,
        summarize_provider=provider,       # Provider for summarization
        summarize_model="gpt-4o-mini",     # Use a cheap/fast model
        summarize_max_tokens=150,          # Max tokens for the summary
    ),
)

Flow: When _enforce_limits() trims messages → the trimmed messages are sent to the summarize provider → a 2-3 sentence summary is generated → stored in memory.summary → injected as a system-level context message on subsequent turns.

See Memory Module for implementation details.

Entity Memory

Automatically extract named entities (people, organizations, projects, etc.) from each turn and inject them as context:

from selectools import EntityMemory

entity_memory = EntityMemory(provider=provider)
agent = Agent(
    tools=[...], provider=provider, memory=memory,
    config=AgentConfig(entity_memory=entity_memory),
)

agent.run([Message(role=Role.USER, content="I'm working with Alice from Acme Corp")])
# Extracts: Alice (person, Acme Corp), Acme Corp (organization)
# Injected as [Known Entities] in system prompt on next turn

See Entity Memory Module for entity types, deduplication, and LRU pruning.

Knowledge Graph Memory

Extract (subject, relation, object) triples from conversation and query them for context injection:

from selectools import KnowledgeGraphMemory

kg = KnowledgeGraphMemory(provider=provider, storage="sqlite")
agent = Agent(
    tools=[...], provider=provider, memory=memory,
    config=AgentConfig(knowledge_graph=kg),
)

agent.run([Message(role=Role.USER, content="Alice manages Project Alpha")])
# Extracts: (Alice, manages, Project Alpha)
# Injected as [Known Relationships] in system prompt on next turn

See Knowledge Graph Module for storage backends and querying.

Cross-Session Knowledge Memory

Persistent knowledge that survives across sessions — daily logs plus a long-term fact store:

from selectools import KnowledgeMemory

knowledge = KnowledgeMemory(directory="./workspace", recent_days=2)
agent = Agent(
    tools=[...], provider=provider,
    config=AgentConfig(knowledge_memory=knowledge),
)
# Auto-registers a `remember` tool — the agent can save facts explicitly
# [Long-term Memory] and [Recent Memory] injected into system prompt

See Knowledge Memory Module for daily logs, fact storage, and retention configuration.

Context Injection Order

When multiple memory features are active, context is injected into the system prompt in this order:

1. [Conversation Summary]     ← summarize_on_trim
2. [Known Entities]           ← entity_memory
3. [Known Relationships]      ← knowledge_graph
4. [Long-term Memory]         ← knowledge_memory (persistent facts)
5. [Recent Memory]            ← knowledge_memory (daily logs)

Each section is only present when the corresponding feature is configured and has data.


Streaming

Agent.astream()

The astream() method provides token-by-token streaming with full feature parity with run() and arun() (as of v0.16.3). It supports response_format, parent_run_id, input/output guardrails, coherence checks, knowledge context injection, entity/KG extraction, session save, structured output validation, analytics, and verbose output.

async for item in agent.astream([Message(role=Role.USER, content="Search for Python")]):
    if isinstance(item, StreamChunk):
        print(item.content, end="", flush=True)
    elif isinstance(item, AgentResult):
        print(f"\nDone in {item.iterations} iterations")

Signature:

async def astream(
    messages: Union[str, List[Message]],
    response_format: Optional[ResponseFormat] = None,  # Structured output
    parent_run_id: Optional[str] = None,               # Trace linking
) -> AsyncGenerator[Union[StreamChunk, AgentResult], None]:

How It Works

  1. Shared _prepare_run() sets up trace, guardrails, memory, knowledge context (identical to run/arun)
  2. Provider streams text deltas and tool call deltas via astream()
  3. Text chunks are yielded as StreamChunk objects
  4. Shared _process_response() applies output guardrails, parses tool calls, extracts reasoning
  5. Tool calls are executed with coherence checks, output screening, analytics, and usage tracking
  6. Shared _finalize_run() saves session, extracts entities/KG, builds full AgentResult
  7. Final AgentResult is yielded (includes parsed, reasoning, reasoning_history, provider_used)

Provider Protocol

All providers implement astream() returning Union[str, ToolCall]:

  • Text deltas: Yielded as raw str chunks
  • Tool calls: Yielded as complete ToolCall objects when ready

Fallback Behavior

If a provider doesn't support astream(), the agent falls back to:

  1. acomplete() (async non-streaming)
  2. complete() via executor (sync in async wrapper)

The response is still yielded as a single StreamChunk for API consistency.


Parallel Tool Execution

Overview

When an LLM requests multiple tool calls in a single response (common with native function calling), the agent executes them concurrently instead of sequentially.

Configuration

config = AgentConfig(
    parallel_tool_execution=True  # Default: enabled
)

Set to False to force sequential execution.

How It Works

Async (arun, astream)

Uses asyncio.gather() to run all tool calls concurrently:

results = await asyncio.gather(*[run_tool(tc) for tc in tool_calls])

Sync (run)

Uses ThreadPoolExecutor with one worker per tool call:

with ThreadPoolExecutor(max_workers=len(tool_calls)) as pool:
    futures = [pool.submit(run_tool, tc) for tc in tool_calls]
    results = [f.result() for f in futures]

Guarantees

  1. Result ordering: Tool results are appended to history in the same order as the original tool calls, regardless of completion order
  2. Error isolation: If one tool fails, others still complete successfully
  3. Hook invocation: on_tool_start, on_tool_end, and on_tool_error fire for every tool
  4. Single tool optimization: When only one tool is called, the sequential path is used (no overhead)

Example

Three tools each taking 0.15s:

  • Sequential: ~0.45s total
  • Parallel: ~0.15s total (3x speedup)
# Automatic - no code changes needed
agent = Agent(
    tools=[weather_tool, stock_tool, news_tool],
    provider=OpenAIProvider(),
    config=AgentConfig(parallel_tool_execution=True)
)

# LLM requests all 3 tools → executed concurrently
result = await agent.arun([Message(role=Role.USER, content="...")])

Response Caching

Overview

The agent supports pluggable response caching to avoid redundant LLM calls. When AgentConfig(cache=...) is set, the agent checks the cache before every provider.complete() / provider.acomplete() call. On a cache hit, the stored (Message, UsageStats) is returned immediately without calling the LLM.

Architecture

flowchart TD
    A["Agent._call_provider()"] --> B["CacheKeyBuilder.build()\nSHA-256 hex digest"]
    B --> C{"cache.get(key)"}
    C -- HIT --> D["Return cached response\nfire on_llm_end hook"]
    C -- MISS --> E["provider.complete(...)"]
    E --> F["cache.set(key, response)"]

Cache Protocol

Any object satisfying the Cache protocol can be used:

@runtime_checkable
class Cache(Protocol):
    def get(self, key: str) -> Optional[Tuple[Any, Any]]: ...
    def set(self, key: str, value: Tuple[Any, Any], ttl: Optional[int] = None) -> None: ...
    def delete(self, key: str) -> bool: ...
    def clear(self) -> None: ...

    @property
    def stats(self) -> CacheStats: ...

Built-in Backends

InMemoryCache

Thread-safe LRU + TTL cache with zero external dependencies:

from selectools import InMemoryCache

cache = InMemoryCache(
    max_size=1000,    # Max entries (LRU eviction)
    default_ttl=300,  # 5 minutes
)

Features:

  • OrderedDict-based O(1) LRU operations
  • Per-entry TTL with monotonic timestamp expiry
  • Thread-safe via threading.Lock
  • CacheStats tracking (hits, misses, evictions, hit_rate)

RedisCache

Distributed TTL cache for multi-process deployments:

from selectools.cache_redis import RedisCache

cache = RedisCache(
    url="redis://localhost:6379/0",
    prefix="selectools:",
    default_ttl=300,
)

Features:

  • Server-side TTL management
  • Pickle-serialized (Message, UsageStats) entries
  • Key prefix namespacing
  • Requires optional dependency: pip install selectools[cache]

Cache Key Generation

CacheKeyBuilder creates deterministic SHA-256 keys from request parameters:

from selectools import CacheKeyBuilder

key = CacheKeyBuilder.build(
    model="gpt-4o",
    system_prompt="You are a helpful assistant.",
    messages=[Message(role=Role.USER, content="Hello")],
    tools=[my_tool],
    temperature=0.0,
)
# → "selectools:a3f2b8c1d4e5..."

Inputs hashed: model, system_prompt, messages (role + content + tool_calls), tools (name + description + parameters), temperature.

Guarantees:

  • Same inputs always produce the same key
  • Different inputs produce different keys
  • Tool ordering is preserved in the hash

What Gets Cached

Call TypeCached?Reason
provider.complete()YesDeterministic request/response
provider.acomplete()YesDeterministic request/response
provider.astream()NoNon-replayable generator
Tool execution resultsNoSide effects possible

Usage Examples

Basic In-Memory Caching

from selectools import Agent, AgentConfig, InMemoryCache

cache = InMemoryCache(max_size=500, default_ttl=600)
config = AgentConfig(model="gpt-4o-mini", cache=cache)
agent = Agent(tools=[my_tool], provider=provider, config=config)

# First call → cache miss → LLM called
response1 = agent.run([Message(role=Role.USER, content="What is Python?")])

# Reset history, same question → cache hit → instant response
agent.reset()
response2 = agent.run([Message(role=Role.USER, content="What is Python?")])

print(cache.stats)
# CacheStats(hits=1, misses=1, evictions=0, hit_rate=50.00%)

Distributed Redis Caching

from selectools.cache_redis import RedisCache

cache = RedisCache(url="redis://my-redis:6379/0", default_ttl=900)
config = AgentConfig(cache=cache)

# Cache is shared across processes/servers
agent = Agent(tools=[...], provider=provider, config=config)

Monitoring Cache Performance

stats = cache.stats
print(f"Hit rate: {stats.hit_rate:.1%}")
print(f"Hits: {stats.hits}, Misses: {stats.misses}")
print(f"Evictions: {stats.evictions}")

Verbose Mode

When verbose=True, cache hits are logged:

[agent] cache hit -- skipping provider call

Integration with Usage Tracking

Cache hits still contribute to AgentUsage. The stored UsageStats is replayed via agent.usage.add_usage(), so cost tracking remains accurate even when responses come from cache.


Structured Output

Overview

Pass a Pydantic BaseModel or dict JSON Schema as response_format to get typed, validated results from the LLM. The agent injects schema instructions into the system prompt, extracts JSON from the response, validates it, and retries on failure.

Usage

from pydantic import BaseModel
from typing import Literal

class Classification(BaseModel):
    intent: Literal["billing", "support", "sales", "cancel"]
    confidence: float
    priority: Literal["low", "medium", "high"]

result = agent.ask("I want to cancel my account", response_format=Classification)
print(result.parsed)  # Classification(intent="cancel", confidence=0.95, priority="high")
print(result.content)  # Raw JSON string

How It Works

  1. build_schema_instruction(schema) generates a prompt fragment describing the expected JSON shape
  2. Schema instruction is appended to the system prompt for the duration of the run
  3. LLM response is passed through extract_json() to isolate the JSON block
  4. parse_and_validate() validates against the Pydantic model or JSON Schema
  5. On validation failure, the error is fed back to the LLM for a retry
  6. result.parsed contains the typed object; result.content has the raw string

Structured Retry Budget (v0.22.0 — BUG-34)

Structured-validation retries now have their own budget, decoupled from max_iterations. Previously, a single global counter was shared between tool-execution iterations and structured-validation retries — an agent with max_iterations=3 and an LLM that failed JSON validation 3 times would terminate before reaching RetryConfig.max_retries.

agent = Agent(
    tools=[...],
    provider=provider,
    config=AgentConfig(
        max_iterations=5,                    # Tool-execution budget
        retry=RetryConfig(max_retries=3),    # Structured-validation retry budget (independent)
    ),
)
  • max_iterations controls how many times tools can be called
  • RetryConfig.max_retries controls how many structured-validation retries are allowed
  • A validation failure increments the retry counter without consuming a tool iteration

Supported Formats

  • Pydantic v2 BaseModel: Full schema generation with type coercion
  • dict JSON Schema: Raw JSON Schema for non-Pydantic users

ResponseFormat Type

ResponseFormat is a type alias for what response_format accepts:

from selectools import ResponseFormat  # Union[Type[Any], Dict[str, Any]]

It accepts either a Pydantic BaseModel subclass or a raw JSON Schema dict.

Standalone Helpers

These utilities can be used independently for custom validation pipelines:

from selectools.structured import (
    extract_json,
    schema_from_response_format,
    parse_and_validate,
    build_schema_instruction,
    validation_retry_message,
)
FunctionDescription
extract_json(text)Extract the first JSON object/array from text (handles code blocks, brace-balanced extraction). Returns None if no JSON found.
schema_from_response_format(fmt)Convert a Pydantic model or dict to a JSON Schema dict.
parse_and_validate(text, fmt)Extract JSON from text, validate against schema, return typed object. Raises ValueError on failure.
build_schema_instruction(schema)Generate the system prompt fragment that instructs the LLM to produce JSON matching the schema.
validation_retry_message(error)Generate the retry message sent to the LLM when validation fails.

Example — custom extraction pipeline:

from selectools.structured import extract_json, parse_and_validate
from pydantic import BaseModel

class Sentiment(BaseModel):
    label: str
    score: float

raw_text = 'Here is the analysis: ```json\n{"label": "positive", "score": 0.95}\n```'

json_str = extract_json(raw_text)     # '{"label": "positive", "score": 0.95}'
result = parse_and_validate(raw_text, Sentiment)  # Sentiment(label="positive", score=0.95)

TraceStep Types for Structured Output

When structured validation fails, a structured_retry step appears in the trace:

for step in result.trace:
    if step.type == "structured_retry":
        print(f"Validation failed: {step.error}")

Execution Traces

Overview

Every run() / arun() automatically produces an AgentTrace — a structured timeline of the entire execution. Access it via result.trace.

Usage

result = agent.run("Classify this ticket")

for step in result.trace:
    print(f"{step.type} | {step.duration_ms:.0f}ms | {step.summary}")

result.trace.to_json("trace.json")
print(result.trace.timeline())

llm_steps = result.trace.filter(type="llm_call")
total_llm_ms = sum(s.duration_ms for s in llm_steps)

TraceStep Types

TypeDescription
llm_callProvider API call with model, tokens, duration
tool_selectionLLM chose a tool (name, args, reasoning)
tool_executionTool was executed (name, result summary, duration)
cache_hitResponse served from cache
errorError during execution
structured_retryStructured output validation failed, retrying
guardrailInput/output guardrail triggered (v0.15.0)
coherence_checkCoherence check blocked a tool call (v0.15.0)
output_screeningTool output screening detected injection (v0.15.0)
session_loadSession loaded from store (v0.16.0)
session_saveSession saved to store (v0.16.0)
memory_summarizeTrimmed messages summarized (v0.16.0)
entity_extractionEntities extracted from conversation (v0.16.0)
kg_extractionKnowledge graph triples extracted (v0.16.0)

AgentTrace Methods

  • trace.to_dict() — Serialize to dict
  • trace.to_json(filepath) — Write JSON to file
  • trace.timeline() — Human-readable timeline string
  • trace.filter(type=...) — Filter steps by type
  • trace.total_duration_ms — Total execution time

Reasoning Visibility

Overview

LLMs often return explanatory text alongside tool calls. This reasoning is now captured and surfaced on AgentResult.

Usage

result = agent.run("Route this customer request")

print(result.reasoning)
# "The customer is asking about billing charges, routing to billing_support"

for i, reasoning in enumerate(result.reasoning_history):
    print(f"Iteration {i}: {reasoning}")

How It Works

The agent extracts text content from LLM responses that precede or accompany tool call decisions. No extra LLM calls are needed — it purely captures what providers already return but previously discarded.

  • result.reasoning — reasoning text from the final tool selection
  • result.reasoning_history — list of reasoning strings, one per iteration
  • step.reasoning on tool_selection trace steps

Provider Fallback

Overview

FallbackProvider wraps multiple providers in priority order. If one fails, the next is tried automatically with circuit breaker protection.

Usage

from selectools import FallbackProvider, OpenAIProvider, AnthropicProvider

provider = FallbackProvider([
    OpenAIProvider(default_model="gpt-4o-mini"),
    AnthropicProvider(default_model="claude-haiku"),
])
agent = Agent(tools=[...], provider=provider)

Circuit Breaker

After max_failures consecutive failures, a provider is skipped for cooldown_seconds:

provider = FallbackProvider(
    providers=[openai, anthropic, local],
    max_failures=3,
    cooldown_seconds=60,
    on_fallback=lambda name, error: print(f"Skipping {name}: {error}"),
)

Supported Methods

FallbackProvider implements the full Provider protocol: complete(), acomplete(), stream(), astream().


Batch Processing

Overview

Process multiple prompts concurrently with configurable parallelism.

Usage

# Sync
results = agent.batch(
    ["Cancel my sub", "How do I upgrade?", "Payment failed"],
    max_concurrency=5,
)

# Async
results = await agent.abatch(
    ["Cancel my sub", "How do I upgrade?", "Payment failed"],
    max_concurrency=10,
)

Guarantees

  • Returns list[AgentResult] in same order as input
  • Per-request error isolation (one failure doesn't cancel the batch)
  • Respects response_format if provided
  • on_progress(completed, total) callback for monitoring

Tool Policy & Human-in-the-Loop

Overview

Declarative allow/review/deny rules evaluated before every tool execution, with optional human approval for flagged tools.

Tool Policy

from selectools import ToolPolicy

policy = ToolPolicy(
    allow=["search_*", "read_*", "get_*"],
    review=["send_*", "create_*", "update_*"],
    deny=["delete_*", "drop_*"],
    deny_when=[{"tool": "send_email", "arg": "to", "pattern": "*@external.com"}],
)
config = AgentConfig(tool_policy=policy)

Evaluation order: denyreviewallow → unknown defaults to review.

Human-in-the-Loop

async def confirm(tool_name: str, tool_args: dict, reason: str) -> bool:
    return await get_user_approval(tool_name, tool_args)

config = AgentConfig(
    tool_policy=policy,
    confirm_action=confirm,
    approval_timeout=60,
)

Agent loop behaviour:

Policy DecisionBehaviour
allowExecute immediately
review + confirm_actionCall callback; execute if approved, deny if rejected
review + no callbackDeny with error message to LLM
denyReturn error to LLM, never execute

Agent-Level Approval Gate (require_approval + approval_handler)

@beta — introduced for the ROADMAP P2 "Agent-Level Human-in-the-Loop" item.

Standalone agents (no AgentGraph required) can gate named tools behind an approval handler, centralizing what @tool(requires_approval=True) does per-tool:

from selectools import Agent, AgentConfig, ApprovalRequest
from selectools.agent.config_groups import ToolConfig

def my_callback(request: ApprovalRequest) -> bool:   # sync or async
    print(request.preview)      # e.g. send_email(to='a@b.com', subject='hi')
    print(request.tool_name, request.tool_args, request.reason)
    return ask_human(request)   # truthy = execute, falsy = deny

config = AgentConfig(tool=ToolConfig(
    require_approval=["execute_shell", "send_email"],   # or "*" for all tools
    approval_handler=my_callback,
    approval_timeout=60,
))

Semantics:

  • A tool is gated when it appears in require_approval (or "*" is used) OR it was defined with requires_approval=True — config and tool-level gates compose with OR.
  • approval_handler receives a structured ApprovalRequest (tool_name, tool_args, reason, one-line preview) and returns a bool. Any non-bool return value fails CLOSED — coroutines, generators, mocks, and other accidentally-truthy objects deny the call instead of approving it. When set, it services every review decision (config gate, tool flag, and ToolPolicy review rules) and takes precedence over confirm_action. request.tool_args is a defensive copy — mutating it never changes what the tool executes with.
  • Deny → the tool is not executed; the model sees a standardized Tool '<name>' denied by approval handler: <reason> tool result and the loop continues (mirrors confirm_action denial). Denials are memoized per (tool name, args) within the run: if the model retries the identical denied call on a later iteration, the stored denial is returned without re-paging the human. Approvals are never memoized — every approved call is re-requested.
  • Async handlers work from run() too: from the sync path the coroutine executes via asyncio.run on the shared worker pool; from arun()/astream() it is awaited natively. Both plain coroutine functions and class instances with async def __call__ are detected. Sync handlers run in an executor with contextvars propagated.
  • Handler exceptions and approval_timeout expiries deny the call (with the error in the tool result) — the loop never crashes. Timed-out approval futures are cancelled so a still-queued handler never fires after the call was already denied.
  • Pool contention: blocking handlers (and async handlers invoked from sync run()) each occupy one slot of the shared 16-worker selectools_tool_timeout pool — the same pool that enforces timeout_seconds for ALL agents in the process. Sixteen concurrent approvals waiting on humans will stall tool-timeout enforcement everywhere; keep handlers prompt, set approval_timeout deliberately, and prefer the async entry points for high-concurrency deployments.
  • ToolPolicy deny rules still win unconditionally; the handler is never consulted.
  • Fail-fast validation: ToolConfig(require_approval=[...]) without an approval_handler or confirm_action raises ValueError at construction — an unapprovable gate would silently deny every call and burn iterations. Use ToolPolicy(deny=[...]) to hard-block tools instead.

For out-of-loop confirmation (WhatsApp/Telegram webhook turns), see Deferred Confirmation. See examples/108_agent_hitl.py for an offline runnable demo.


Terminal Actions

Some tools are "terminal" — the agent loop should stop after they execute, without making another LLM call.

Static declaration — tool author marks it at definition time:

@tool(terminal=True)
def present_question(question_id: int) -> str:
    """Present a question card to the student."""
    return json.dumps({"action": "present_question", "id": question_id})

Dynamic condition — stop decision depends on the result content:

config = AgentConfig(
    stop_condition=lambda tool_name, result: "present_question" in result,
)

After tool execution, the agent checks: tool.terminal or (config.stop_condition and config.stop_condition(tool_name, result))

If true, the tool result becomes AgentResult.content and the loop exits immediately. Works in run(), arun(), astream(), and parallel tool execution.


Implementation Details

Internal Architecture — Mixin Decomposition

The Agent class is composed from 4 internal mixins for maintainability:

MixinFileResponsibility
_ToolExecutorMixinagent/_tool_executor.pyTool execution pipeline, policy, coherence, parallel execution
_ProviderCallerMixinagent/_provider_caller.pyLLM provider calls, caching, retry, streaming
_LifecycleMixinagent/_lifecycle.pyObserver notification, fallback provider wiring
_MemoryManagerMixinagent/_memory_manager.pyMemory operations, session persistence, entity/KG extraction

All public methods remain on the Agent class — the mixins are internal implementation details.

Key Attributes

class Agent:
    def __init__(self, tools, provider, config, memory):
        self.tools = tools                      # List of Tool objects
        self._tools_by_name = {...}             # Dict for O(1) lookup
        self.provider = provider                # Provider instance
        self.prompt_builder = PromptBuilder()   # Generates system prompts
        self.parser = ToolCallParser()          # Parses tool calls
        self.config = config                    # AgentConfig
        self.memory = memory                    # Optional ConversationMemory
        self.usage = AgentUsage()               # Tracks tokens/cost
        self.analytics = AgentAnalytics()       # Optional analytics

        # Pre-build system prompt (constant per agent instance)
        self._system_prompt = self.prompt_builder.build(self.tools)

        # Local conversation history (reset per run if no memory)
        self._history: List[Message] = []

History Management

def _append_assistant_and_tool(self, assistant_content, tool_content, tool_name, tool_result=None):
    assistant_msg = Message(role=Role.ASSISTANT, content=assistant_content)
    tool_msg = Message(
        role=Role.TOOL,
        content=tool_content,
        tool_name=tool_name,
        tool_result=tool_result,
    )

    # Append to local history
    self._history.append(assistant_msg)
    self._history.append(tool_msg)

    # Also save to memory if available
    if self.memory:
        self.memory.add_many([assistant_msg, tool_msg])

Usage Tracking Convenience Methods

@property
def total_cost(self) -> float:
    return self.usage.total_cost_usd

@property
def total_tokens(self) -> int:
    return self.usage.total_tokens

def get_usage_summary(self) -> str:
    return str(self.usage)  # Pretty-printed summary

def reset_usage(self) -> None:
    self.usage = AgentUsage()

Analytics Access

def get_analytics(self) -> AgentAnalytics | None:
    return self.analytics  # None if not enabled

Best Practices

1. Choose Appropriate Iteration Limits

# Quick interactions
config = AgentConfig(max_iterations=3)

# Complex multi-step tasks
config = AgentConfig(max_iterations=10)

# Simple single-shot (no tools expected)
config = AgentConfig(max_iterations=1)

2. Set Tool Timeouts

config = AgentConfig(
    tool_timeout_seconds=30.0  # Prevent runaway tools
)

3. Use Verbose Mode for Debugging

config = AgentConfig(verbose=True)
# Prints token counts, costs, tool calls

4. Enable Cost Warnings

config = AgentConfig(
    cost_warning_threshold=0.10  # Warn at \$0.10
)

5. Reset Usage Between Sessions

agent.reset_usage()  # Clear token/cost counters

6. Use Memory for Conversations

# For chatbots, Q&A systems, assistants
memory = ConversationMemory(max_messages=20)
agent = Agent(..., memory=memory)

7. Enable Analytics for Optimization

config = AgentConfig(enable_analytics=True)
agent = Agent(..., config=config)

# Later: analyze which tools are used most
analytics = agent.get_analytics()
print(analytics.summary())

Performance Optimization

1. Reuse Agent Instances

# Good: Create once, use many times
agent = Agent(tools=[...], provider=provider)
for query in queries:
    response = agent.run([Message(role=Role.USER, content=query)])

2. Use Async for Concurrency

# Process multiple queries concurrently
async def process_queries(queries):
    agent = Agent(...)
    tasks = [agent.arun([Message(role=Role.USER, content=q)]) for q in queries]
    return await asyncio.gather(*tasks)

3. Limit max_tokens

# Reduce output tokens to save cost
config = AgentConfig(max_tokens=500)

4. Choose Efficient Models

# Use mini models when appropriate
config = AgentConfig(model="gpt-4o-mini")  # 15x cheaper than gpt-4o

Testing

Unit Testing with Local Provider

from selectools.providers.stubs import LocalProvider

agent = Agent(
    tools=[my_tool],
    provider=LocalProvider(),  # No API calls
    config=AgentConfig(max_iterations=2, model="local")
)

response = agent.run([Message(role=Role.USER, content="test")])

Recording Observer Events

from selectools import AgentObserver

def test_agent_with_observer():
    called = []

    class Recorder(AgentObserver):
        def on_tool_start(self, run_id, call_id, tool_name, tool_args):
            called.append((tool_name, tool_args))

    config = AgentConfig(observers=[Recorder()])
    agent = Agent(tools=[...], provider=provider, config=config)

    agent.run([...])

    assert len(called) > 0
    assert called[0][0] == "expected_tool"

Common Pitfalls

1. Forgetting to Set API Keys

# ❌ This will raise ProviderConfigurationError
provider = OpenAIProvider()  # OPENAI_API_KEY not set

# ✅ Set via env var
export OPENAI_API_KEY="sk-..."

# ✅ Or pass directly
provider = OpenAIProvider(api_key="sk-...")

2. Infinite Loops

# ❌ If LLM keeps calling tools that fail
config = AgentConfig(max_iterations=1000)  # Dangerous!

# ✅ Use reasonable limits
config = AgentConfig(max_iterations=6)  # Default is safe

3. Not Handling Tool Errors

# Agent handles tool errors gracefully by default
# But tools should still validate inputs and provide helpful errors

@tool(description="Divide two numbers")
def divide(a: float, b: float) -> str:
    if b == 0:
        raise ValueError("Cannot divide by zero")
    return str(a / b)

#ScriptDescription
0101_hello_world.pyYour first agent with LocalProvider
0606_async_agent.pyAsync agent with arun()
2424_traces_and_reasoning.pyExecution traces and reasoning visibility
2525_provider_fallback.pyFallbackProvider with circuit breaker
2626_batch_processing.pyConcurrent multi-prompt batch execution

Further Reading


Next Steps: Understand how tools are defined and validated in the Tools Module.