Agent Module
June 11, 2026 · View on GitHub
Import: from selectools import Agent, AgentConfig
Stability: stable
Since: v0.13.0
from selectools import Agent, AgentConfig, tool
from selectools.providers.stubs import LocalProvider
@tool()
def search(query: str) -> str:
"""Search for information."""
return f"Results for: {query}"
agent = Agent(
tools=[search],
provider=LocalProvider(),
config=AgentConfig(model="gpt-4o"),
)
result = agent.run("Find Python tutorials")
print(result.content) # Agent response
print(result.usage) # Token counts + cost
!!! tip "See Also" - Tools -- @tool decorator and tool system - Streaming -- token-level async streaming - Memory -- conversation history management - Providers -- OpenAI, Anthropic, Gemini, Ollama adapters
File: src/selectools/agent/core.py
Classes: Agent, AgentConfig
Table of Contents
- Overview
- Agent Loop Lifecycle
- Tool Selection Process
- Configuration
- Retry and Error Handling
- Sync vs Async Execution
- Hook System (Removed)
- AgentObserver Protocol
- Memory Integration
- Streaming
- Parallel Tool Execution
- Response Caching
- Structured Output
- Execution Traces
- Reasoning Visibility
- Provider Fallback
- Batch Processing
- Tool Policy & Human-in-the-Loop
- Terminal Actions
- Implementation Details
Overview
The Agent class is the central orchestrator of the selectools framework. It manages the iterative loop of sending messages to an LLM, parsing responses for tool calls, executing those tools, and feeding results back until the task is complete.
Key Responsibilities
- Conversation Management: Maintain message history with optional memory
- Provider Communication: Call LLM APIs through provider abstraction (with fallback)
- Tool Orchestration: Detect, validate, enforce policies, and execute tool calls
- Structured Output: Validate LLM responses against Pydantic/JSON Schema with auto-retry
- Execution Traces: Record structured timeline of every step (
AgentTrace) - Reasoning Visibility: Extract and surface why the agent chose a tool
- Error Recovery: Handle failures with retries and backoff
- Observability: Notify lifecycle observers for monitoring
- Cost Tracking: Monitor token usage and costs
- Analytics: Track tool usage patterns (optional)
- Parallel Execution: Execute independent tool calls concurrently
- Batch Processing: Process multiple prompts concurrently
- Streaming: Token-level streaming with native tool support
- Response Caching: Avoid redundant LLM calls via pluggable cache layer
- Tool Policy & HITL: Declarative allow/review/deny rules with human approval
Properties & Convenience Methods
| Property / Method | Description |
|---|---|
agent.name | Returns config.name (default: "agent"). Useful for multi-agent identification. |
agent(messages, **kw) | Shorthand for agent.run(messages, **kw) via __call__. |
agent.ask(prompt) | Shorthand for run() with a single string prompt. |
agent.aask(prompt) | Async version of ask(). |
# Named agents for multi-agent systems
researcher = Agent(tools=[search], config=AgentConfig(name="researcher"))
print(researcher.name) # "researcher"
# Call the agent directly
result = researcher("Find info about Python") # same as researcher.run(...)
Core Dependencies
from .types import Message, Role
from .tools import Tool
from .prompt import PromptBuilder
from .parser import ToolCallParser
from .structured import parse_and_validate, build_schema_instruction
from .trace import AgentTrace, TraceStep
from .policy import ToolPolicy, PolicyDecision
from .providers.base import Provider
from .providers.fallback import FallbackProvider
from .memory import ConversationMemory # Optional
from .usage import AgentUsage
from .analytics import AgentAnalytics # Optional
Agent Loop Lifecycle
State Machine Diagram
flowchart TD
START([START]) --> LOAD["Load Message History\n(from memory if set)"]
LOAD --> LOOP["Iteration Loop\n(max_iterations times)"]
LOOP --> HOOK["on_iteration_start hook"]
LOOP --> BUILD["Build Prompt with Tools"]
BUILD --> LLM["Call LLM Provider\n(with retries)"]
LLM --> PARSE["Parse Response\n(ToolCallParser)"]
PARSE --> TC{Tool call?}
TC -- No --> RETURN["Return Final Response"]
TC -- Yes --> VALIDATE{"Valid tool\n& params?"}
VALIDATE -- Invalid --> ERR["Append Error Message"]
ERR --> LOOP
VALIDATE -- Valid --> EXEC["Execute Tool\n(with timeout)"]
EXEC --> APPEND["Append Result to History"]
APPEND --> LOOP
Execution Flow
1. Initialization
agent = Agent(
tools=[search_tool, calculator_tool],
provider=OpenAIProvider(),
config=AgentConfig(max_iterations=6),
memory=ConversationMemory(max_messages=20)
)
The agent initializes with:
- Tool registry (
_tools_by_namedict for O(1) lookup) - System prompt (built from tool schemas)
- Empty history
- Usage tracker
- Optional analytics tracker
2. Run Method Entry
response = agent.run([
Message(role=Role.USER, content="Search for Python and calculate 2+2")
])
Steps:
- Call
on_agent_starthook - Load history from memory (if available)
- Append new messages to history
- Enter iteration loop
3. Iteration Loop
iteration = 0
while iteration < self.config.max_iterations:
iteration += 1
# 1. Call hook
self._call_hook("on_iteration_start", iteration, self._history)
# 2. Call provider
response_text = self._call_provider()
# 3. Parse response
parse_result = self.parser.parse(response_text)
# 4. Check for tool call
if not parse_result.tool_call:
# No tool call - we're done!
return Message(role=Role.ASSISTANT, content=response_text)
# 5. Execute tool
tool = self._tools_by_name.get(tool_name)
result = self._execute_tool_with_timeout(tool, parameters)
# 6. Append to history
self._append_assistant_and_tool(response_text, result, tool_name)
# 7. Loop continues...
4. Tool Execution
def _execute_tool_with_timeout(self, tool, parameters, chunk_callback):
if not self.config.tool_timeout_seconds:
return tool.execute(parameters, chunk_callback)
# Execute with timeout
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(tool.execute, parameters, chunk_callback)
try:
return future.result(timeout=self.config.tool_timeout_seconds)
except TimeoutError:
future.cancel()
raise TimeoutError(f"Tool '{tool.name}' timed out")
Features:
- Optional timeout enforcement
- Chunk callback for streaming tools
- Exception wrapping for better errors
- Analytics tracking (if enabled)
5. Loop Termination
The loop exits when:
- No tool call detected → Return LLM response as final answer
- Max iterations reached → Return timeout message
- Exception raised → Propagate to caller
Tool Selection Process
How the Agent Decides Which Tool to Use
The agent doesn't directly "decide" - it relies on the LLM to make the decision based on the system prompt and conversation context.
graph TD
A["System Prompt (PromptBuilder)\nTool call contract + tool JSON schemas"] --> C["LLM\nGPT-4o / Claude / Gemini / etc."]
B["Conversation History\nUSER: Search for Python tutorials"] --> C
C --> D["LLM Response\nTOOL_CALL: search\nparams: query = 'Python tutorials'"]
Validation Flow
# 1. Parse the tool call
parse_result = self.parser.parse(response_text)
if parse_result.tool_call:
tool_name = parse_result.tool_call.tool_name
parameters = parse_result.tool_call.parameters
# 2. Check if tool exists
tool = self._tools_by_name.get(tool_name)
if not tool:
error_msg = f"Unknown tool '{tool_name}'. Available: {list(self._tools_by_name.keys())}"
# Append error and continue loop
self._append_assistant_and_tool(response_text, error_msg, tool_name)
continue
# 3. Validate parameters
try:
tool.validate(parameters)
except ToolValidationError as e:
# Append validation error and continue
self._append_assistant_and_tool(response_text, str(e), tool_name)
continue
# 4. Execute tool
result = tool.execute(parameters)
Error Handling Strategy:
The agent doesn't fail on invalid tool calls. Instead:
- Append error message to conversation
- Let LLM see the error
- LLM can retry with corrections or choose a different approach
This creates a self-correcting loop.
Configuration
AgentConfig Dataclass
@dataclass
class AgentConfig:
# Model selection
model: str = "gpt-5-mini"
temperature: float = 0.0
max_tokens: int = 1000
# Loop control
max_iterations: int = 6
# Reliability
max_retries: int = 2
retry_backoff_seconds: float = 1.0
rate_limit_cooldown_seconds: float = 5.0
request_timeout: Optional[float] = 30.0
tool_timeout_seconds: Optional[float] = None
# Cost management
cost_warning_threshold: Optional[float] = None
# Observability
verbose: bool = False
enable_analytics: bool = False
observers: List[AgentObserver] = field(default_factory=list)
# Execution mode
routing_only: bool = False
parallel_tool_execution: bool = True
# Streaming
stream: bool = False
# Caching
cache: Optional[Cache] = None # InMemoryCache, RedisCache, or custom
# Tool Safety
tool_policy: Optional[ToolPolicy] = None # allow/review/deny rules
confirm_action: Optional[ConfirmAction] = None # Human-in-the-loop callback
approval_timeout: float = 60.0 # Seconds before auto-deny
# Sessions & Persistence (v0.16.0)
session_store: Optional[SessionStore] = None # Auto-save/load conversation state
session_id: Optional[str] = None # Unique session identifier
# Summarize-on-Trim (v0.16.0)
summarize_on_trim: bool = False # Summarize trimmed messages before dropping
summarize_provider: Optional[Provider] = None # Provider for summarization (defaults to agent's)
summarize_model: Optional[str] = None # Model for summarization (use a cheap model)
summarize_max_tokens: int = 150 # Max tokens for the summary response
# Advanced Memory (v0.16.0)
entity_memory: Optional[EntityMemory] = None # LLM-based entity extraction
knowledge_graph: Optional[KnowledgeGraphMemory] = None # Relationship triple extraction
knowledge_memory: Optional[KnowledgeMemory] = None # Cross-session durable memory
Configuration Patterns
Production Config
config = AgentConfig(
model="gpt-4o-mini", # Cost-effective
temperature=0.0, # Deterministic
max_tokens=2000,
max_iterations=10,
max_retries=3,
retry_backoff_seconds=2.0,
rate_limit_cooldown_seconds=10.0,
request_timeout=60.0,
tool_timeout_seconds=30.0,
cost_warning_threshold=0.50, # Alert at \$0.50
verbose=False,
enable_analytics=True
)
Production Config with Caching
from selectools import InMemoryCache
cache = InMemoryCache(max_size=2000, default_ttl=600)
config = AgentConfig(
model="gpt-4o-mini",
temperature=0.0,
cache=cache, # Enable response caching
max_retries=3,
cost_warning_threshold=0.50,
)
Development Config
config = AgentConfig(
model="gpt-4o",
verbose=True, # See what's happening
max_iterations=3, # Fast feedback
stream=True, # See responses live
)
Budget-Conscious Config
config = AgentConfig(
model="gpt-4o-mini",
max_tokens=500,
max_iterations=3,
cost_warning_threshold=0.01,
)
Retry and Error Handling
Retry Logic Flow
flowchart TD
A["Provider Call"] --> B["Attempt N"]
B --> C{Success?}
C -- Yes --> D["Return response"]
C -- No --> E{Rate limit error?}
E -- Yes --> F["Sleep(rate_limit_cooldown * attempt)"]
E -- No --> G["Sleep(retry_backoff * attempt)"]
F --> G
G --> H{Retries remaining?}
H -- Yes --> B
H -- No --> I["Return error message"]
Implementation
def _call_provider(self, stream_handler=None):
attempts = 0
last_error = None
while attempts <= self.config.max_retries:
attempts += 1
try:
# Call provider
response_text, usage_stats = self.provider.complete(
model=self.config.model,
system_prompt=self._system_prompt,
messages=self._history,
temperature=self.config.temperature,
max_tokens=self.config.max_tokens,
timeout=self.config.request_timeout,
)
# Track usage
self.usage.add_usage(usage_stats)
return response_text
except ProviderError as exc:
last_error = str(exc)
if attempts > self.config.max_retries:
break
# Rate limit handling
if self._is_rate_limit_error(last_error):
time.sleep(self.config.rate_limit_cooldown_seconds * attempts)
# Standard backoff
if self.config.retry_backoff_seconds:
time.sleep(self.config.retry_backoff_seconds * attempts)
return f"Provider error: {last_error or 'unknown error'}"
Rate Limit Detection
def _is_rate_limit_error(self, message: str) -> bool:
lowered = message.lower()
return "rate limit" in lowered or "429" in lowered
Tool Execution Errors
Tool errors don't cause the entire agent to fail:
try:
result = self._execute_tool_with_timeout(tool, parameters)
self._call_hook("on_tool_end", tool.name, result, duration)
except Exception as exc:
self._call_hook("on_tool_error", tool.name, exc, parameters)
error_message = f"Error executing tool '{tool.name}': {exc}"
self._append_assistant_and_tool(response_text, error_message, tool.name)
# Continue to next iteration - let LLM handle the error
continue
Sync vs Async Execution
All three execution methods share the same parameters and feature set (as of v0.16.3):
| Parameter | Type | Default | Description |
|---|---|---|---|
messages | str | List[Message] | required | User prompt or message list |
stream_handler | Callable[[str], None] | None | Callback for streaming chunks (run/arun only) |
response_format | ResponseFormat | None | Pydantic model or JSON Schema for structured output |
parent_run_id | str | None | Links trace to a parent agent's run for nested orchestration |
Sync Execution (run())
response = agent.run([Message(role=Role.USER, content="Hello")])
When to use:
- Simple scripts
- Jupyter notebooks
- Single-threaded applications
- Blocking I/O is acceptable
Async Execution (arun())
response = await agent.arun([Message(role=Role.USER, content="Hello")])
When to use:
- Web frameworks (FastAPI, aiohttp)
- Concurrent operations
- High-performance applications
- Multiple agents in parallel
Implementation Differences
Sync Path
def run(self, messages, stream_handler=None):
# Provider call (blocking)
response_text, usage_stats = self.provider.complete(...)
# Tool execution (blocking)
result = tool.execute(parameters)
Async Path
async def arun(self, messages, stream_handler=None):
# Provider call (non-blocking)
if hasattr(self.provider, "acomplete"):
response_text, usage_stats = await self.provider.acomplete(...)
else:
# Fallback: run sync in executor
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as executor:
response_text, usage_stats = await loop.run_in_executor(
executor, lambda: self.provider.complete(...)
)
# Tool execution (non-blocking)
result = await tool.aexecute(parameters)
Async Tool Support
Tools can be async:
@tool(description="Fetch data from API")
async def fetch_data(url: str) -> str:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.text()
The agent automatically detects and handles async tools via tool.is_async flag.
Hook System (Removed)
Removed in v1.0 —
AgentConfig.hooks(deprecated since v0.16.5) has been removed. Passinghooks=toAgentConfignow raisesTypeError. UseAgentObserverorAsyncAgentObserverinstead. Seedocs/MIGRATION_1.0.mdanddocs/decisions/002-observer-replaces-hooks.md.
Migration is mechanical — each hook key maps to an observer method with richer
arguments (run_id, and call_id for tool events):
| Legacy hook key | Observer method |
|---|---|
on_agent_start | on_run_start(run_id, messages, system_prompt) |
on_agent_end | on_run_end(run_id, result) |
on_iteration_start | on_iteration_start(run_id, iteration, messages) |
on_iteration_end | on_iteration_end(run_id, iteration, response) |
on_tool_start | on_tool_start(run_id, call_id, tool_name, tool_args) |
on_tool_chunk | on_tool_chunk(run_id, call_id, tool_name, chunk) |
on_tool_end | on_tool_end(run_id, call_id, tool_name, result, duration_ms) |
on_tool_error | on_tool_error(run_id, call_id, tool_name, error, tool_args, duration_ms) |
on_llm_start | on_llm_start(run_id, messages, model, system_prompt) |
on_llm_end | on_llm_end(run_id, response, usage) |
on_error | on_error(run_id, error, context) |
Note: hook duration was in seconds; observer duration_ms is in milliseconds.
# Before (removed)
config = AgentConfig(hooks={"on_tool_start": lambda name, args: log(name)})
# After
from selectools import AgentObserver
class ToolLogger(AgentObserver):
def on_tool_start(self, run_id, call_id, tool_name, tool_args):
log(tool_name)
config = AgentConfig(observers=[ToolLogger()])
Design Decision (unchanged): Observer errors never break the agent. They're for observability, not control flow.
AgentObserver Protocol
File: src/selectools/observer.py
Classes: AgentObserver, LoggingObserver
The AgentObserver protocol is the class-based notification system for structured observability integrations (Langfuse, OpenTelemetry, Datadog). Every callback receives a run_id for cross-request correlation, and tool callbacks also receive a call_id for matching parallel tool start/end pairs.
Quick Start
from selectools import Agent, AgentConfig, AgentObserver, LoggingObserver
class MyObserver(AgentObserver):
def on_llm_start(self, run_id, messages, model, system_prompt):
print(f"[{run_id}] LLM call to {model}")
def on_tool_end(self, run_id, call_id, tool_name, result, duration_ms):
print(f"[{run_id}] {tool_name} finished in {duration_ms:.1f}ms")
agent = Agent(
tools=[...], provider=provider,
config=AgentConfig(observers=[MyObserver(), LoggingObserver()]),
)
All 31 Lifecycle Events
| Event | Scope | Parameters (after run_id) | When |
|---|---|---|---|
on_run_start | Run | messages, system_prompt | Start of run()/arun()/astream() |
on_run_end | Run | result (AgentResult) | Agent produces final result |
on_error | Run | error, context | Unrecoverable error |
on_llm_start | LLM | messages, model, system_prompt | Before each provider call |
on_llm_end | LLM | response, usage | After each provider call |
on_cache_hit | LLM | model, response | Response served from cache |
on_usage | LLM | usage (UsageStats) | Per-call token/cost stats |
on_llm_retry | LLM | attempt, max_retries, error, backoff_seconds | LLM call about to be retried |
on_tool_start | Tool | call_id, tool_name, tool_args | Before tool execution |
on_tool_end | Tool | call_id, tool_name, result, duration_ms | After successful tool execution |
on_tool_error | Tool | call_id, tool_name, error, tool_args, duration_ms | Tool raised an exception |
on_tool_chunk | Tool | call_id, tool_name, chunk | Streaming tool emits a chunk |
on_iteration_start | Iteration | iteration, messages | Start of agent loop iteration |
on_iteration_end | Iteration | iteration, response | End of agent loop iteration |
on_batch_start | Batch | batch_id*, prompts_count | Before batch()/abatch() |
on_batch_end | Batch | batch_id*, results_count, errors_count, total_duration_ms | After all batch items complete |
on_policy_decision | Policy | tool_name, decision, reason, tool_args | After tool policy evaluation |
on_structured_validate | Structured | success, attempt, error | After structured output validation |
on_provider_fallback | Fallback | failed_provider, next_provider, error | FallbackProvider switches provider |
on_memory_trim | Memory | messages_removed, messages_remaining, reason | Memory enforces limits |
on_session_load | Session | session_id, message_count | Session loaded from store (v0.16.0) |
on_session_save | Session | session_id, message_count | Session saved to store (v0.16.0) |
on_memory_summarize | Memory | summary, messages_summarized | Trimmed messages summarized (v0.16.0) |
on_entity_extraction | Memory | entities, turn_count | Entities extracted from turn (v0.16.0) |
*on_batch_start/on_batch_end use batch_id instead of run_id.
Built-in LoggingObserver
Emits structured JSON events to Python's logging module:
import logging
logging.basicConfig(level=logging.INFO)
agent = Agent(
tools=[...], provider=provider,
config=AgentConfig(observers=[LoggingObserver()]),
)
Output:
{"event": "run_start", "run_id": "a3f2...", "model": "gpt-4o-mini", "timestamp": 1708099200.0}
{"event": "llm_end", "run_id": "a3f2...", "tokens": 150, "duration_ms": 312.5}
{"event": "tool_end", "run_id": "a3f2...", "tool": "search", "duration_ms": 45.2}
Why Observers (vs the removed hooks dict)
| Aspect | Hooks (dict, removed in v1.0) | AgentObserver |
|---|---|---|
| Correlation | Manual (closures, thread-local) | Built-in run_id + call_id |
| Multiple consumers | One callback per event | Multiple observers |
| Event coverage | 8 events | 31 events (including batch, fallback, retry, memory, budget, cancellation, model switch) |
| Type safety | Dict keys are strings | Protocol methods with signatures |
| Use case | Quick debugging, simple logging | Production observability (Langfuse, OTel, Datadog) |
AsyncAgentObserver
For async-native applications (FastAPI, aiohttp, async SQLAlchemy), AsyncAgentObserver
provides 28 async a_on_* methods that mirror the sync observer:
from selectools import AsyncAgentObserver
class DBObserver(AsyncAgentObserver):
blocking = True # await inline — must complete before next tool
async def a_on_tool_end(self, run_id, call_id, tool_name, result, duration_ms):
await db.execute("INSERT INTO events ...")
class WebhookObserver(AsyncAgentObserver):
blocking = False # fire-and-forget via asyncio.ensure_future
async def a_on_run_end(self, run_id, result):
await httpx.post("https://hooks.example.com/...", json={...})
agent = Agent(
tools=[...],
provider=provider,
config=AgentConfig(observers=[DBObserver(), WebhookObserver()]),
)
blocking=True: Awaited inline — the agent loop waits for completion. Use for DB writes, rate limiting, result enrichment.blocking=False(default): Dispatched viaasyncio.ensure_future(). Use for webhooks, logging, audit trails.
Async observers are called in arun() and astream() after each sync observer notification.
In sync run(), only sync observers fire.
Trace Metadata & Nested Agents
config = AgentConfig(
parent_run_id="outer-agent-run-id",
trace_metadata={"user_id": "u123", "environment": "production"},
observers=[MyObserver()],
)
result = agent.run("classify this")
print(result.trace.parent_run_id) # "outer-agent-run-id"
print(result.trace.metadata) # {"user_id": "u123", "environment": "production"}
# Export as OpenTelemetry spans
spans = result.trace.to_otel_spans()
Memory Integration
Basic Memory
memory = ConversationMemory(max_messages=20)
agent = Agent(tools=[...], provider=provider, memory=memory)
# First turn
response1 = agent.run([Message(role=Role.USER, content="My name is Alice")])
# Second turn - history is preserved
response2 = agent.run([Message(role=Role.USER, content="What's my name?")])
# LLM can reference "Alice" from previous turn
Flow:
graph TD
A["run() called"] --> B["memory.get_history()"]
B --> C["Append new messages"]
C --> D["memory.add_many(new_messages)"]
D --> E["Execute loop"]
E --> F["memory.add(final_response)"]
F --> G["Return"]
Without Memory
agent = Agent(tools=[...], provider=provider) # No memory
# Each call is independent
response = agent.run([Message(role=Role.USER, content="Hello")])
History is local to each run() call.
Memory Limits
memory = ConversationMemory(
max_messages=20, # Keep last 20 messages
max_tokens=4000 # Or limit by token count
)
When limits are exceeded, oldest messages are dropped (sliding window).
Persistent Sessions
Auto-save and auto-load conversation state across process restarts using session_store and session_id:
from selectools.sessions import JsonFileSessionStore
store = JsonFileSessionStore(directory="./sessions")
agent = Agent(
tools=[...], provider=provider,
config=AgentConfig(session_store=store, session_id="user-123"),
)
# First run — auto-loads existing session (if any), auto-saves after
result = agent.run([Message(role=Role.USER, content="My name is Alice")])
# Later (even after restart) — session is restored automatically
result = agent.run([Message(role=Role.USER, content="What's my name?")])
# Agent knows: "Alice"
Four backends are available: JsonFileSessionStore, SQLiteSessionStore, RedisSessionStore, SupabaseSessionStore. All support namespace isolation; the file/SQLite/Redis backends also support TTL-based expiry.
See Sessions Module for backend details and TTL configuration.
Summarize-on-Trim
When messages are trimmed by the sliding window, optionally generate a summary of the dropped messages and inject it as system context:
agent = Agent(
tools=[...], provider=provider,
memory=ConversationMemory(max_messages=30),
config=AgentConfig(
summarize_on_trim=True,
summarize_provider=provider, # Provider for summarization
summarize_model="gpt-4o-mini", # Use a cheap/fast model
summarize_max_tokens=150, # Max tokens for the summary
),
)
Flow: When _enforce_limits() trims messages → the trimmed messages are sent to the summarize provider → a 2-3 sentence summary is generated → stored in memory.summary → injected as a system-level context message on subsequent turns.
See Memory Module for implementation details.
Entity Memory
Automatically extract named entities (people, organizations, projects, etc.) from each turn and inject them as context:
from selectools import EntityMemory
entity_memory = EntityMemory(provider=provider)
agent = Agent(
tools=[...], provider=provider, memory=memory,
config=AgentConfig(entity_memory=entity_memory),
)
agent.run([Message(role=Role.USER, content="I'm working with Alice from Acme Corp")])
# Extracts: Alice (person, Acme Corp), Acme Corp (organization)
# Injected as [Known Entities] in system prompt on next turn
See Entity Memory Module for entity types, deduplication, and LRU pruning.
Knowledge Graph Memory
Extract (subject, relation, object) triples from conversation and query them for context injection:
from selectools import KnowledgeGraphMemory
kg = KnowledgeGraphMemory(provider=provider, storage="sqlite")
agent = Agent(
tools=[...], provider=provider, memory=memory,
config=AgentConfig(knowledge_graph=kg),
)
agent.run([Message(role=Role.USER, content="Alice manages Project Alpha")])
# Extracts: (Alice, manages, Project Alpha)
# Injected as [Known Relationships] in system prompt on next turn
See Knowledge Graph Module for storage backends and querying.
Cross-Session Knowledge Memory
Persistent knowledge that survives across sessions — daily logs plus a long-term fact store:
from selectools import KnowledgeMemory
knowledge = KnowledgeMemory(directory="./workspace", recent_days=2)
agent = Agent(
tools=[...], provider=provider,
config=AgentConfig(knowledge_memory=knowledge),
)
# Auto-registers a `remember` tool — the agent can save facts explicitly
# [Long-term Memory] and [Recent Memory] injected into system prompt
See Knowledge Memory Module for daily logs, fact storage, and retention configuration.
Context Injection Order
When multiple memory features are active, context is injected into the system prompt in this order:
1. [Conversation Summary] ← summarize_on_trim
2. [Known Entities] ← entity_memory
3. [Known Relationships] ← knowledge_graph
4. [Long-term Memory] ← knowledge_memory (persistent facts)
5. [Recent Memory] ← knowledge_memory (daily logs)
Each section is only present when the corresponding feature is configured and has data.
Streaming
Agent.astream()
The astream() method provides token-by-token streaming with full feature parity with run() and arun() (as of v0.16.3). It supports response_format, parent_run_id, input/output guardrails, coherence checks, knowledge context injection, entity/KG extraction, session save, structured output validation, analytics, and verbose output.
async for item in agent.astream([Message(role=Role.USER, content="Search for Python")]):
if isinstance(item, StreamChunk):
print(item.content, end="", flush=True)
elif isinstance(item, AgentResult):
print(f"\nDone in {item.iterations} iterations")
Signature:
async def astream(
messages: Union[str, List[Message]],
response_format: Optional[ResponseFormat] = None, # Structured output
parent_run_id: Optional[str] = None, # Trace linking
) -> AsyncGenerator[Union[StreamChunk, AgentResult], None]:
How It Works
- Shared
_prepare_run()sets up trace, guardrails, memory, knowledge context (identical to run/arun) - Provider streams text deltas and tool call deltas via
astream() - Text chunks are yielded as
StreamChunkobjects - Shared
_process_response()applies output guardrails, parses tool calls, extracts reasoning - Tool calls are executed with coherence checks, output screening, analytics, and usage tracking
- Shared
_finalize_run()saves session, extracts entities/KG, builds fullAgentResult - Final
AgentResultis yielded (includesparsed,reasoning,reasoning_history,provider_used)
Provider Protocol
All providers implement astream() returning Union[str, ToolCall]:
- Text deltas: Yielded as raw
strchunks - Tool calls: Yielded as complete
ToolCallobjects when ready
Fallback Behavior
If a provider doesn't support astream(), the agent falls back to:
acomplete()(async non-streaming)complete()via executor (sync in async wrapper)
The response is still yielded as a single StreamChunk for API consistency.
Parallel Tool Execution
Overview
When an LLM requests multiple tool calls in a single response (common with native function calling), the agent executes them concurrently instead of sequentially.
Configuration
config = AgentConfig(
parallel_tool_execution=True # Default: enabled
)
Set to False to force sequential execution.
How It Works
Async (arun, astream)
Uses asyncio.gather() to run all tool calls concurrently:
results = await asyncio.gather(*[run_tool(tc) for tc in tool_calls])
Sync (run)
Uses ThreadPoolExecutor with one worker per tool call:
with ThreadPoolExecutor(max_workers=len(tool_calls)) as pool:
futures = [pool.submit(run_tool, tc) for tc in tool_calls]
results = [f.result() for f in futures]
Guarantees
- Result ordering: Tool results are appended to history in the same order as the original tool calls, regardless of completion order
- Error isolation: If one tool fails, others still complete successfully
- Hook invocation:
on_tool_start,on_tool_end, andon_tool_errorfire for every tool - Single tool optimization: When only one tool is called, the sequential path is used (no overhead)
Example
Three tools each taking 0.15s:
- Sequential: ~0.45s total
- Parallel: ~0.15s total (3x speedup)
# Automatic - no code changes needed
agent = Agent(
tools=[weather_tool, stock_tool, news_tool],
provider=OpenAIProvider(),
config=AgentConfig(parallel_tool_execution=True)
)
# LLM requests all 3 tools → executed concurrently
result = await agent.arun([Message(role=Role.USER, content="...")])
Response Caching
Overview
The agent supports pluggable response caching to avoid redundant LLM calls. When AgentConfig(cache=...) is set, the agent checks the cache before every provider.complete() / provider.acomplete() call. On a cache hit, the stored (Message, UsageStats) is returned immediately without calling the LLM.
Architecture
flowchart TD
A["Agent._call_provider()"] --> B["CacheKeyBuilder.build()\nSHA-256 hex digest"]
B --> C{"cache.get(key)"}
C -- HIT --> D["Return cached response\nfire on_llm_end hook"]
C -- MISS --> E["provider.complete(...)"]
E --> F["cache.set(key, response)"]
Cache Protocol
Any object satisfying the Cache protocol can be used:
@runtime_checkable
class Cache(Protocol):
def get(self, key: str) -> Optional[Tuple[Any, Any]]: ...
def set(self, key: str, value: Tuple[Any, Any], ttl: Optional[int] = None) -> None: ...
def delete(self, key: str) -> bool: ...
def clear(self) -> None: ...
@property
def stats(self) -> CacheStats: ...
Built-in Backends
InMemoryCache
Thread-safe LRU + TTL cache with zero external dependencies:
from selectools import InMemoryCache
cache = InMemoryCache(
max_size=1000, # Max entries (LRU eviction)
default_ttl=300, # 5 minutes
)
Features:
OrderedDict-based O(1) LRU operations- Per-entry TTL with monotonic timestamp expiry
- Thread-safe via
threading.Lock CacheStatstracking (hits, misses, evictions, hit_rate)
RedisCache
Distributed TTL cache for multi-process deployments:
from selectools.cache_redis import RedisCache
cache = RedisCache(
url="redis://localhost:6379/0",
prefix="selectools:",
default_ttl=300,
)
Features:
- Server-side TTL management
- Pickle-serialized
(Message, UsageStats)entries - Key prefix namespacing
- Requires optional dependency:
pip install selectools[cache]
Cache Key Generation
CacheKeyBuilder creates deterministic SHA-256 keys from request parameters:
from selectools import CacheKeyBuilder
key = CacheKeyBuilder.build(
model="gpt-4o",
system_prompt="You are a helpful assistant.",
messages=[Message(role=Role.USER, content="Hello")],
tools=[my_tool],
temperature=0.0,
)
# → "selectools:a3f2b8c1d4e5..."
Inputs hashed: model, system_prompt, messages (role + content + tool_calls), tools (name + description + parameters), temperature.
Guarantees:
- Same inputs always produce the same key
- Different inputs produce different keys
- Tool ordering is preserved in the hash
What Gets Cached
| Call Type | Cached? | Reason |
|---|---|---|
provider.complete() | Yes | Deterministic request/response |
provider.acomplete() | Yes | Deterministic request/response |
provider.astream() | No | Non-replayable generator |
| Tool execution results | No | Side effects possible |
Usage Examples
Basic In-Memory Caching
from selectools import Agent, AgentConfig, InMemoryCache
cache = InMemoryCache(max_size=500, default_ttl=600)
config = AgentConfig(model="gpt-4o-mini", cache=cache)
agent = Agent(tools=[my_tool], provider=provider, config=config)
# First call → cache miss → LLM called
response1 = agent.run([Message(role=Role.USER, content="What is Python?")])
# Reset history, same question → cache hit → instant response
agent.reset()
response2 = agent.run([Message(role=Role.USER, content="What is Python?")])
print(cache.stats)
# CacheStats(hits=1, misses=1, evictions=0, hit_rate=50.00%)
Distributed Redis Caching
from selectools.cache_redis import RedisCache
cache = RedisCache(url="redis://my-redis:6379/0", default_ttl=900)
config = AgentConfig(cache=cache)
# Cache is shared across processes/servers
agent = Agent(tools=[...], provider=provider, config=config)
Monitoring Cache Performance
stats = cache.stats
print(f"Hit rate: {stats.hit_rate:.1%}")
print(f"Hits: {stats.hits}, Misses: {stats.misses}")
print(f"Evictions: {stats.evictions}")
Verbose Mode
When verbose=True, cache hits are logged:
[agent] cache hit -- skipping provider call
Integration with Usage Tracking
Cache hits still contribute to AgentUsage. The stored UsageStats is replayed via agent.usage.add_usage(), so cost tracking remains accurate even when responses come from cache.
Structured Output
Overview
Pass a Pydantic BaseModel or dict JSON Schema as response_format to get typed, validated results from the LLM. The agent injects schema instructions into the system prompt, extracts JSON from the response, validates it, and retries on failure.
Usage
from pydantic import BaseModel
from typing import Literal
class Classification(BaseModel):
intent: Literal["billing", "support", "sales", "cancel"]
confidence: float
priority: Literal["low", "medium", "high"]
result = agent.ask("I want to cancel my account", response_format=Classification)
print(result.parsed) # Classification(intent="cancel", confidence=0.95, priority="high")
print(result.content) # Raw JSON string
How It Works
build_schema_instruction(schema)generates a prompt fragment describing the expected JSON shape- Schema instruction is appended to the system prompt for the duration of the run
- LLM response is passed through
extract_json()to isolate the JSON block parse_and_validate()validates against the Pydantic model or JSON Schema- On validation failure, the error is fed back to the LLM for a retry
result.parsedcontains the typed object;result.contenthas the raw string
Structured Retry Budget (v0.22.0 — BUG-34)
Structured-validation retries now have their own budget, decoupled from
max_iterations. Previously, a single global counter was shared between
tool-execution iterations and structured-validation retries — an agent with
max_iterations=3 and an LLM that failed JSON validation 3 times would
terminate before reaching RetryConfig.max_retries.
agent = Agent(
tools=[...],
provider=provider,
config=AgentConfig(
max_iterations=5, # Tool-execution budget
retry=RetryConfig(max_retries=3), # Structured-validation retry budget (independent)
),
)
max_iterationscontrols how many times tools can be calledRetryConfig.max_retriescontrols how many structured-validation retries are allowed- A validation failure increments the retry counter without consuming a tool iteration
Supported Formats
- Pydantic v2
BaseModel: Full schema generation with type coercion dictJSON Schema: Raw JSON Schema for non-Pydantic users
ResponseFormat Type
ResponseFormat is a type alias for what response_format accepts:
from selectools import ResponseFormat # Union[Type[Any], Dict[str, Any]]
It accepts either a Pydantic BaseModel subclass or a raw JSON Schema dict.
Standalone Helpers
These utilities can be used independently for custom validation pipelines:
from selectools.structured import (
extract_json,
schema_from_response_format,
parse_and_validate,
build_schema_instruction,
validation_retry_message,
)
| Function | Description |
|---|---|
extract_json(text) | Extract the first JSON object/array from text (handles code blocks, brace-balanced extraction). Returns None if no JSON found. |
schema_from_response_format(fmt) | Convert a Pydantic model or dict to a JSON Schema dict. |
parse_and_validate(text, fmt) | Extract JSON from text, validate against schema, return typed object. Raises ValueError on failure. |
build_schema_instruction(schema) | Generate the system prompt fragment that instructs the LLM to produce JSON matching the schema. |
validation_retry_message(error) | Generate the retry message sent to the LLM when validation fails. |
Example — custom extraction pipeline:
from selectools.structured import extract_json, parse_and_validate
from pydantic import BaseModel
class Sentiment(BaseModel):
label: str
score: float
raw_text = 'Here is the analysis: ```json\n{"label": "positive", "score": 0.95}\n```'
json_str = extract_json(raw_text) # '{"label": "positive", "score": 0.95}'
result = parse_and_validate(raw_text, Sentiment) # Sentiment(label="positive", score=0.95)
TraceStep Types for Structured Output
When structured validation fails, a structured_retry step appears in the trace:
for step in result.trace:
if step.type == "structured_retry":
print(f"Validation failed: {step.error}")
Execution Traces
Overview
Every run() / arun() automatically produces an AgentTrace — a structured timeline of the entire execution. Access it via result.trace.
Usage
result = agent.run("Classify this ticket")
for step in result.trace:
print(f"{step.type} | {step.duration_ms:.0f}ms | {step.summary}")
result.trace.to_json("trace.json")
print(result.trace.timeline())
llm_steps = result.trace.filter(type="llm_call")
total_llm_ms = sum(s.duration_ms for s in llm_steps)
TraceStep Types
| Type | Description |
|---|---|
llm_call | Provider API call with model, tokens, duration |
tool_selection | LLM chose a tool (name, args, reasoning) |
tool_execution | Tool was executed (name, result summary, duration) |
cache_hit | Response served from cache |
error | Error during execution |
structured_retry | Structured output validation failed, retrying |
guardrail | Input/output guardrail triggered (v0.15.0) |
coherence_check | Coherence check blocked a tool call (v0.15.0) |
output_screening | Tool output screening detected injection (v0.15.0) |
session_load | Session loaded from store (v0.16.0) |
session_save | Session saved to store (v0.16.0) |
memory_summarize | Trimmed messages summarized (v0.16.0) |
entity_extraction | Entities extracted from conversation (v0.16.0) |
kg_extraction | Knowledge graph triples extracted (v0.16.0) |
AgentTrace Methods
trace.to_dict()— Serialize to dicttrace.to_json(filepath)— Write JSON to filetrace.timeline()— Human-readable timeline stringtrace.filter(type=...)— Filter steps by typetrace.total_duration_ms— Total execution time
Reasoning Visibility
Overview
LLMs often return explanatory text alongside tool calls. This reasoning is now captured and surfaced on AgentResult.
Usage
result = agent.run("Route this customer request")
print(result.reasoning)
# "The customer is asking about billing charges, routing to billing_support"
for i, reasoning in enumerate(result.reasoning_history):
print(f"Iteration {i}: {reasoning}")
How It Works
The agent extracts text content from LLM responses that precede or accompany tool call decisions. No extra LLM calls are needed — it purely captures what providers already return but previously discarded.
result.reasoning— reasoning text from the final tool selectionresult.reasoning_history— list of reasoning strings, one per iterationstep.reasoningontool_selectiontrace steps
Provider Fallback
Overview
FallbackProvider wraps multiple providers in priority order. If one fails, the next is tried automatically with circuit breaker protection.
Usage
from selectools import FallbackProvider, OpenAIProvider, AnthropicProvider
provider = FallbackProvider([
OpenAIProvider(default_model="gpt-4o-mini"),
AnthropicProvider(default_model="claude-haiku"),
])
agent = Agent(tools=[...], provider=provider)
Circuit Breaker
After max_failures consecutive failures, a provider is skipped for cooldown_seconds:
provider = FallbackProvider(
providers=[openai, anthropic, local],
max_failures=3,
cooldown_seconds=60,
on_fallback=lambda name, error: print(f"Skipping {name}: {error}"),
)
Supported Methods
FallbackProvider implements the full Provider protocol: complete(), acomplete(), stream(), astream().
Batch Processing
Overview
Process multiple prompts concurrently with configurable parallelism.
Usage
# Sync
results = agent.batch(
["Cancel my sub", "How do I upgrade?", "Payment failed"],
max_concurrency=5,
)
# Async
results = await agent.abatch(
["Cancel my sub", "How do I upgrade?", "Payment failed"],
max_concurrency=10,
)
Guarantees
- Returns
list[AgentResult]in same order as input - Per-request error isolation (one failure doesn't cancel the batch)
- Respects
response_formatif provided on_progress(completed, total)callback for monitoring
Tool Policy & Human-in-the-Loop
Overview
Declarative allow/review/deny rules evaluated before every tool execution, with optional human approval for flagged tools.
Tool Policy
from selectools import ToolPolicy
policy = ToolPolicy(
allow=["search_*", "read_*", "get_*"],
review=["send_*", "create_*", "update_*"],
deny=["delete_*", "drop_*"],
deny_when=[{"tool": "send_email", "arg": "to", "pattern": "*@external.com"}],
)
config = AgentConfig(tool_policy=policy)
Evaluation order: deny → review → allow → unknown defaults to review.
Human-in-the-Loop
async def confirm(tool_name: str, tool_args: dict, reason: str) -> bool:
return await get_user_approval(tool_name, tool_args)
config = AgentConfig(
tool_policy=policy,
confirm_action=confirm,
approval_timeout=60,
)
Agent loop behaviour:
| Policy Decision | Behaviour |
|---|---|
allow | Execute immediately |
review + confirm_action | Call callback; execute if approved, deny if rejected |
review + no callback | Deny with error message to LLM |
deny | Return error to LLM, never execute |
Agent-Level Approval Gate (require_approval + approval_handler)
@beta— introduced for the ROADMAP P2 "Agent-Level Human-in-the-Loop" item.
Standalone agents (no AgentGraph required) can gate named tools behind an approval
handler, centralizing what @tool(requires_approval=True) does per-tool:
from selectools import Agent, AgentConfig, ApprovalRequest
from selectools.agent.config_groups import ToolConfig
def my_callback(request: ApprovalRequest) -> bool: # sync or async
print(request.preview) # e.g. send_email(to='a@b.com', subject='hi')
print(request.tool_name, request.tool_args, request.reason)
return ask_human(request) # truthy = execute, falsy = deny
config = AgentConfig(tool=ToolConfig(
require_approval=["execute_shell", "send_email"], # or "*" for all tools
approval_handler=my_callback,
approval_timeout=60,
))
Semantics:
- A tool is gated when it appears in
require_approval(or"*"is used) OR it was defined withrequires_approval=True— config and tool-level gates compose with OR. approval_handlerreceives a structuredApprovalRequest(tool_name,tool_args,reason, one-linepreview) and returns a bool. Any non-bool return value fails CLOSED — coroutines, generators, mocks, and other accidentally-truthy objects deny the call instead of approving it. When set, it services everyreviewdecision (config gate, tool flag, andToolPolicyreview rules) and takes precedence overconfirm_action.request.tool_argsis a defensive copy — mutating it never changes what the tool executes with.- Deny → the tool is not executed; the model sees a standardized
Tool '<name>' denied by approval handler: <reason>tool result and the loop continues (mirrorsconfirm_actiondenial). Denials are memoized per (tool name, args) within the run: if the model retries the identical denied call on a later iteration, the stored denial is returned without re-paging the human. Approvals are never memoized — every approved call is re-requested. - Async handlers work from
run()too: from the sync path the coroutine executes viaasyncio.runon the shared worker pool; fromarun()/astream()it is awaited natively. Both plain coroutine functions and class instances withasync def __call__are detected. Sync handlers run in an executor with contextvars propagated. - Handler exceptions and
approval_timeoutexpiries deny the call (with the error in the tool result) — the loop never crashes. Timed-out approval futures are cancelled so a still-queued handler never fires after the call was already denied. - Pool contention: blocking handlers (and async handlers invoked from sync
run()) each occupy one slot of the shared 16-workerselectools_tool_timeoutpool — the same pool that enforcestimeout_secondsfor ALL agents in the process. Sixteen concurrent approvals waiting on humans will stall tool-timeout enforcement everywhere; keep handlers prompt, setapproval_timeoutdeliberately, and prefer the async entry points for high-concurrency deployments. ToolPolicydenyrules still win unconditionally; the handler is never consulted.- Fail-fast validation:
ToolConfig(require_approval=[...])without anapproval_handlerorconfirm_actionraisesValueErrorat construction — an unapprovable gate would silently deny every call and burn iterations. UseToolPolicy(deny=[...])to hard-block tools instead.
For out-of-loop confirmation (WhatsApp/Telegram webhook turns), see
Deferred Confirmation.
See examples/108_agent_hitl.py for an offline runnable demo.
Terminal Actions
Some tools are "terminal" — the agent loop should stop after they execute, without making another LLM call.
Static declaration — tool author marks it at definition time:
@tool(terminal=True)
def present_question(question_id: int) -> str:
"""Present a question card to the student."""
return json.dumps({"action": "present_question", "id": question_id})
Dynamic condition — stop decision depends on the result content:
config = AgentConfig(
stop_condition=lambda tool_name, result: "present_question" in result,
)
After tool execution, the agent checks:
tool.terminal or (config.stop_condition and config.stop_condition(tool_name, result))
If true, the tool result becomes AgentResult.content and the loop exits immediately.
Works in run(), arun(), astream(), and parallel tool execution.
Implementation Details
Internal Architecture — Mixin Decomposition
The Agent class is composed from 4 internal mixins for maintainability:
| Mixin | File | Responsibility |
|---|---|---|
_ToolExecutorMixin | agent/_tool_executor.py | Tool execution pipeline, policy, coherence, parallel execution |
_ProviderCallerMixin | agent/_provider_caller.py | LLM provider calls, caching, retry, streaming |
_LifecycleMixin | agent/_lifecycle.py | Observer notification, fallback provider wiring |
_MemoryManagerMixin | agent/_memory_manager.py | Memory operations, session persistence, entity/KG extraction |
All public methods remain on the Agent class — the mixins are internal implementation details.
Key Attributes
class Agent:
def __init__(self, tools, provider, config, memory):
self.tools = tools # List of Tool objects
self._tools_by_name = {...} # Dict for O(1) lookup
self.provider = provider # Provider instance
self.prompt_builder = PromptBuilder() # Generates system prompts
self.parser = ToolCallParser() # Parses tool calls
self.config = config # AgentConfig
self.memory = memory # Optional ConversationMemory
self.usage = AgentUsage() # Tracks tokens/cost
self.analytics = AgentAnalytics() # Optional analytics
# Pre-build system prompt (constant per agent instance)
self._system_prompt = self.prompt_builder.build(self.tools)
# Local conversation history (reset per run if no memory)
self._history: List[Message] = []
History Management
def _append_assistant_and_tool(self, assistant_content, tool_content, tool_name, tool_result=None):
assistant_msg = Message(role=Role.ASSISTANT, content=assistant_content)
tool_msg = Message(
role=Role.TOOL,
content=tool_content,
tool_name=tool_name,
tool_result=tool_result,
)
# Append to local history
self._history.append(assistant_msg)
self._history.append(tool_msg)
# Also save to memory if available
if self.memory:
self.memory.add_many([assistant_msg, tool_msg])
Usage Tracking Convenience Methods
@property
def total_cost(self) -> float:
return self.usage.total_cost_usd
@property
def total_tokens(self) -> int:
return self.usage.total_tokens
def get_usage_summary(self) -> str:
return str(self.usage) # Pretty-printed summary
def reset_usage(self) -> None:
self.usage = AgentUsage()
Analytics Access
def get_analytics(self) -> AgentAnalytics | None:
return self.analytics # None if not enabled
Best Practices
1. Choose Appropriate Iteration Limits
# Quick interactions
config = AgentConfig(max_iterations=3)
# Complex multi-step tasks
config = AgentConfig(max_iterations=10)
# Simple single-shot (no tools expected)
config = AgentConfig(max_iterations=1)
2. Set Tool Timeouts
config = AgentConfig(
tool_timeout_seconds=30.0 # Prevent runaway tools
)
3. Use Verbose Mode for Debugging
config = AgentConfig(verbose=True)
# Prints token counts, costs, tool calls
4. Enable Cost Warnings
config = AgentConfig(
cost_warning_threshold=0.10 # Warn at \$0.10
)
5. Reset Usage Between Sessions
agent.reset_usage() # Clear token/cost counters
6. Use Memory for Conversations
# For chatbots, Q&A systems, assistants
memory = ConversationMemory(max_messages=20)
agent = Agent(..., memory=memory)
7. Enable Analytics for Optimization
config = AgentConfig(enable_analytics=True)
agent = Agent(..., config=config)
# Later: analyze which tools are used most
analytics = agent.get_analytics()
print(analytics.summary())
Performance Optimization
1. Reuse Agent Instances
# Good: Create once, use many times
agent = Agent(tools=[...], provider=provider)
for query in queries:
response = agent.run([Message(role=Role.USER, content=query)])
2. Use Async for Concurrency
# Process multiple queries concurrently
async def process_queries(queries):
agent = Agent(...)
tasks = [agent.arun([Message(role=Role.USER, content=q)]) for q in queries]
return await asyncio.gather(*tasks)
3. Limit max_tokens
# Reduce output tokens to save cost
config = AgentConfig(max_tokens=500)
4. Choose Efficient Models
# Use mini models when appropriate
config = AgentConfig(model="gpt-4o-mini") # 15x cheaper than gpt-4o
Testing
Unit Testing with Local Provider
from selectools.providers.stubs import LocalProvider
agent = Agent(
tools=[my_tool],
provider=LocalProvider(), # No API calls
config=AgentConfig(max_iterations=2, model="local")
)
response = agent.run([Message(role=Role.USER, content="test")])
Recording Observer Events
from selectools import AgentObserver
def test_agent_with_observer():
called = []
class Recorder(AgentObserver):
def on_tool_start(self, run_id, call_id, tool_name, tool_args):
called.append((tool_name, tool_args))
config = AgentConfig(observers=[Recorder()])
agent = Agent(tools=[...], provider=provider, config=config)
agent.run([...])
assert len(called) > 0
assert called[0][0] == "expected_tool"
Common Pitfalls
1. Forgetting to Set API Keys
# ❌ This will raise ProviderConfigurationError
provider = OpenAIProvider() # OPENAI_API_KEY not set
# ✅ Set via env var
export OPENAI_API_KEY="sk-..."
# ✅ Or pass directly
provider = OpenAIProvider(api_key="sk-...")
2. Infinite Loops
# ❌ If LLM keeps calling tools that fail
config = AgentConfig(max_iterations=1000) # Dangerous!
# ✅ Use reasonable limits
config = AgentConfig(max_iterations=6) # Default is safe
3. Not Handling Tool Errors
# Agent handles tool errors gracefully by default
# But tools should still validate inputs and provide helpful errors
@tool(description="Divide two numbers")
def divide(a: float, b: float) -> str:
if b == 0:
raise ValueError("Cannot divide by zero")
return str(a / b)
Related Examples
| # | Script | Description |
|---|---|---|
| 01 | 01_hello_world.py | Your first agent with LocalProvider |
| 06 | 06_async_agent.py | Async agent with arun() |
| 24 | 24_traces_and_reasoning.py | Execution traces and reasoning visibility |
| 25 | 25_provider_fallback.py | FallbackProvider with circuit breaker |
| 26 | 26_batch_processing.py | Concurrent multi-prompt batch execution |
Further Reading
- Tools Module - Tool definition and validation
- Dynamic Tools Module - Dynamic tool loading and runtime management
- Parser Module - Tool call parsing details
- Providers Module - Provider implementations and FallbackProvider
- Memory Module - Conversation memory and tool-pair-aware trimming
- Sessions Module - Persistent session storage with 4 backends
- Entity Memory Module - Named entity extraction and tracking
- Knowledge Graph Module - Relationship triple extraction
- Knowledge Memory Module - Cross-session durable memory
- Usage Module - Cost tracking
- Architecture - System-level overview including new modules
Next Steps: Understand how tools are defined and validated in the Tools Module.