Streaming and Performance Module
June 14, 2026 · View on GitHub
Import: from selectools import Agent
Stability: stable
import asyncio
from selectools import Agent, AgentConfig, Message, Role, tool
from selectools.providers.stubs import LocalProvider
@tool(description="Search the web")
def search(query: str) -> str:
return f"Results for '{query}': Python is a popular programming language."
provider = LocalProvider()
agent = Agent(
tools=[search],
provider=provider,
config=AgentConfig(max_iterations=2),
)
async def main():
async for item in agent.astream(
[Message(role=Role.USER, content="Search for Python tutorials")]
):
# item is either StreamChunk (text) or AgentResult (final)
print(type(item).__name__, getattr(item, "content", "")[:80])
asyncio.run(main())
!!! tip "See Also" - Agent - Agent lifecycle, hooks, and configuration - Providers - Provider implementations and streaming support
Directory: src/selectools/agent/
Key Types: StreamChunk, AgentResult (from selectools.types)
Table of Contents
- Overview
- Quick Start
- E2E Streaming (v0.11.0)
- Parallel Tool Execution (v0.11.0)
- Native Function Calling (v0.10.0)
- Routing Mode (v0.10.0)
- Context Propagation (v0.10.0)
- AgentResult (v0.9.0)
- Custom System Prompt (v0.9.0)
- Agent.reset() (v0.9.0)
- Performance Comparison
- Practical Examples
- Best Practices
- Troubleshooting
- Further Reading
Overview
The selectools library provides a rich set of streaming and performance features that enable real-time token delivery, concurrent tool execution, and programmatic inspection of agent behavior. These capabilities span from token-level streaming (astream) to routing without execution (routing_only), from native function calling to context-preserving tool execution.
Feature Summary
| Feature | Version | Purpose |
|---|---|---|
| E2E Streaming | v0.11.0 | Token-by-token output with native tool call support |
| Parallel Tool Execution | v0.11.0 | Run multiple tools concurrently in a single iteration |
| Native Function Calling | v0.10.0 | Provider-native tool APIs, no regex parsing |
| Routing Mode | v0.10.0 | Select a tool without executing it (classification, intent routing) |
| Context Propagation | v0.10.0 | Preserve tracing and auth when running sync tools in executors |
| AgentResult | v0.9.0 | Structured return with message, tool metadata, iterations |
| Custom System Prompt | v0.9.0 | Inject domain instructions via AgentConfig |
| Agent.reset() | v0.9.0 | Clear state for clean reuse across requests |
Import Paths
from selectools import Agent, AgentConfig, Message, Role
from selectools.types import StreamChunk, AgentResult
Quick Start
Streaming with astream()
import asyncio
from selectools import Agent, AgentConfig, Message, Role, OpenAIProvider
from selectools.types import StreamChunk, AgentResult
agent = Agent(
tools=[search_tool],
provider=OpenAIProvider(),
config=AgentConfig(max_iterations=3),
)
async def main():
async for item in agent.astream([Message(role=Role.USER, content="Search for Python tutorials")]):
if isinstance(item, StreamChunk):
print(item.content, end="", flush=True)
elif isinstance(item, AgentResult):
print(f"\n\nDone in {item.iterations} iterations")
if item.tool_calls:
print(f"Tools used: {[tc.tool_name for tc in item.tool_calls]}")
asyncio.run(main())
E2E Streaming (v0.11.0)
Agent.astream()
Agent.astream(messages) returns an AsyncGenerator yielding Union[StreamChunk, AgentResult]:
- StreamChunk — Intermediate content chunks (text and/or tool calls)
- AgentResult — Final result, yielded once when the agent completes
Accumulate deltas, not the final result. The terminal
AgentResult.contentis the whole answer, not a delta. Always discriminate by type — a naivetext += item.contentover every yielded item appends the full answer on top of the deltas and gives you the response twice:text = "" async for item in agent.astream("..."): if isinstance(item, StreamChunk): text += item.content or "" # deltas only else: result = item # AgentResult: full answer + metadata
StreamChunk
@dataclass
class StreamChunk:
content: str = "" # Text delta
role: Role = Role.ASSISTANT
tool_calls: Optional[List[ToolCall]] = None # Optional; present when chunk contains tool invocations
content: The text portion of this chunktool_calls: Optional list ofToolCallobjects when the LLM emits tool invocations during streaming
AgentResult as Final Item
The last item yielded by astream() is always an AgentResult. It carries:
message— Final assistant responsetool_name— Last tool called (orNone)tool_args— Args for last tooliterations— Number of loop iterationstool_calls— AllToolCallobjects from the run
Provider Protocol
Providers implement astream() yielding Union[str, ToolCall]:
- Text deltas — Raw
strchunks (token-by-token) - Tool calls — Complete
ToolCallobjects when ready (native function calling)
graph LR
P["Provider.astream()"] --> A["yield 'Hello' (str)"]
P --> B["yield ' ' (str)"]
P --> C["yield 'world' (str)"]
P --> D["yield ToolCall(...) (tool invocation)"]
P --> E["yield '!' (str)"]
Fallback Chain
When a provider does not support streaming:
flowchart TD
A["astream() requested"] --> B{"Provider has astream()?"}
B -- Yes --> C["Use it"]
B -- No --> D{"Provider has acomplete()?"}
D -- Yes --> E["Call it, yield full response\nas single StreamChunk"]
D -- No --> F["Run complete() in\nThreadPoolExecutor"]
Tool Call Accumulation and Multi-Iteration
- Accumulation: Tool calls are accumulated as they stream in from the provider.
- Execution: When all tool calls in a response are ready, they are executed (in parallel if
parallel_tool_execution=True). - Continue: Results are appended to history; streaming continues with the next LLM call.
- Final result: When the LLM produces a final text response with no tool calls,
AgentResultis yielded.
graph TD
subgraph Iteration1["Iteration 1"]
A1["StreamChunk('Searching...')"] --> A2["StreamChunk(tool_calls=[...])"]
A2 --> A3["Tools executed"]
end
subgraph Iteration2["Iteration 2"]
B1["StreamChunk('Here are the results:')"] --> B2["StreamChunk('- Result 1')"]
B2 --> B3["..."]
B3 --> B4["AgentResult(iterations=2, tool_calls=[...])"]
end
Iteration1 --> Iteration2
Parallel Tool Execution (v0.11.0)
Overview
When the LLM requests multiple tool calls in a single response (common with native function calling), the agent executes them concurrently instead of sequentially.
Configuration
config = AgentConfig(
parallel_tool_execution=True # Default: enabled
)
Set to False for strictly sequential execution.
Async Execution
Uses asyncio.gather() for concurrent tool runs:
results = await asyncio.gather(*[run_tool(tc) for tc in tool_calls])
Sync Execution
Uses ThreadPoolExecutor with one worker per tool:
with ThreadPoolExecutor(max_workers=len(tool_calls)) as pool:
futures = [pool.submit(run_tool, tc) for tc in tool_calls]
results = [f.result() for f in futures]
Guarantees
| Guarantee | Description |
|---|---|
| Result ordering | Results appended to history in original request order |
| Error isolation | One tool failure does not block others |
| Hook invocation | on_tool_start, on_tool_end, on_tool_error fire for every tool |
| Single-tool optimization | Only one tool called → sequential path, no executor overhead |
Native Function Calling (v0.10.0)
Overview
selectools uses provider-native tool APIs instead of regex parsing:
- OpenAI —
functions/tool_usein chat completions - Anthropic —
tool_useblocks - Gemini —
function_callingin responses
Message.tool_calls
Responses carry structured ToolCall objects on Message.tool_calls:
response = provider.complete(...)
msg = response[0]
if msg.tool_calls:
for tc in msg.tool_calls:
print(f"Tool: {tc.tool_name}, Args: {tc.parameters}")
No Regex Parsing
- Providers return
ToolCallobjects directly. - No text-based patterns such as
TOOL_CALL {...}.
Fallback
When a provider returns plain text only (no native tool format), the agent falls back to ToolCallParser regex parsing.
Routing Mode (v0.10.0)
Overview
AgentConfig(routing_only=True) makes the agent choose a tool but not run it. Useful for classification, intent routing, and tool selection.
Configuration
config = AgentConfig(routing_only=True)
agent = Agent(tools=[...], provider=provider, config=config)
Return Value
Returns AgentResult with:
tool_name— Selected tooltool_args— Parsed argumentsmessage— Assistant message containing the selectiontrace— Execution trace (LLM call + tool selection steps)
No tool execution; only one LLM call. Observer events on_iteration_start and on_iteration_end both fire for the single iteration, along with on_run_start/on_run_end.
Use Cases
| Use Case | Example |
|---|---|
| Classification | Route to sales vs support vs billing |
| Intent detection | Choose between search, calculator, or Q&A |
| Tool preselection | Decide which tools to enable before full execution |
Example
from selectools import Agent, AgentConfig, Message, Role, OpenAIProvider
from selectools.types import AgentResult
config = AgentConfig(routing_only=True)
agent = Agent(
tools=[search_tool, calculator_tool, support_tool],
provider=OpenAIProvider(),
config=config,
)
result = agent.run([Message(role=Role.USER, content="I need help with my bill")])
# Inspect routing decision without executing
assert result.tool_name == "support_tool"
assert "billing" in str(result.tool_args).lower() or "bill" in str(result.tool_args).lower()
Context Propagation (v0.10.0)
Overview
When sync tools run inside a ThreadPoolExecutor (e.g. async agent calling sync tools), contextvars.copy_context() is used so request-scoped state (tracing, auth, etc.) is preserved.
How It Works
# In tools/base.py - sync tool execution from async context
context = contextvars.copy_context()
func_with_args = functools.partial(self.function, **call_args)
result = await loop.run_in_executor(executor, context.run, func_with_args)
Preserved State
- OpenTelemetry tracing spans
- Auth tokens
- Request IDs
- Other
contextvarsvalues
Async Tools
Async tools run in the same event loop as the agent; no executor, so context is already intact.
AgentResult (v0.9.0)
Overview
agent.run() and agent.arun() return AgentResult instead of Message, enabling programmatic inspection of tool usage and iterations.
Fields
| Field | Type | Description |
|---|---|---|
message | Message | Final assistant response |
tool_name | Optional[str] | Last tool called, or None |
tool_args | Dict[str, Any] | Args for last tool call |
iterations | int | Number of agent loop iterations |
tool_calls | List[ToolCall] | All tool calls in order |
Backward Compatibility
result.content→result.message.contentresult.role→result.message.role
Example
result = agent.run([Message(role=Role.USER, content="What's the weather in Tokyo?")])
print(result.content) # Final text
print(result.tool_name) # e.g. "get_weather"
print(result.tool_args) # e.g. {"location": "Tokyo"}
print(result.iterations) # e.g. 2
print(len(result.tool_calls)) # Number of tools invoked
Custom System Prompt (v0.9.0)
Overview
AgentConfig(system_prompt="...") injects domain instructions before tool schemas. They persist across iterations.
Configuration
config = AgentConfig(
system_prompt="You are a medical assistant. Only provide information you are confident about."
)
agent = Agent(tools=[...], provider=provider, config=config)
When to Use
- Domain constraints (medical, legal, etc.)
- Tone and persona
- Guardrails and safety
- Language or formatting rules
Example
config = AgentConfig(
system_prompt="""You are a financial advisor.
- Never guarantee returns.
- Always recommend consulting a licensed professional.
- Use clear, non-technical language."""
)
agent = Agent(tools=[lookup_stock, get_news], provider=provider, config=config)
Agent.reset() (v0.9.0)
Overview
Agent.reset() clears history, usage stats, analytics, and memory so the same agent instance can be reused across requests.
What It Clears
_history— Message historyusage— Token/cost statsanalytics— If enabledmemory— IfConversationMemoryis set, callsmemory.clear()
Pattern
agent = Agent(tools=[...], provider=provider, memory=ConversationMemory())
# Create once, reset between requests
for user_request in requests:
agent.reset()
result = agent.run([Message(role=Role.USER, content=user_request)])
Performance Comparison
Sequential vs Parallel Tool Execution
| Scenario | Sequential | Parallel | Speedup |
|---|---|---|---|
| 3 tools × 0.15s each | ~0.45s | ~0.15s | ~3× |
| 5 tools × 0.2s each | ~1.0s | ~0.2s | ~5× |
| 1 tool | 0.15s | 0.15s | 1× (no overhead) |
Benchmark Example
import time
from selectools import Agent, AgentConfig, Message, Role, tool
@tool(description="Simulate slow API")
def slow_api(delay: float) -> str:
time.sleep(delay)
return f"Done after {delay}s"
agent_parallel = Agent(
tools=[slow_api],
provider=provider,
config=AgentConfig(parallel_tool_execution=True, max_iterations=2),
)
agent_sequential = Agent(
tools=[slow_api],
provider=provider,
config=AgentConfig(parallel_tool_execution=False, max_iterations=2),
)
# With a prompt that triggers 3 tool calls:
# parallel: ~0.15s
# sequential: ~0.45s
Practical Examples
Routing Mode for Intent Classification
config = AgentConfig(routing_only=True)
agent = Agent(
tools=[sales_tool, support_tool, billing_tool],
provider=provider,
config=config,
)
intent = agent.run([Message(role=Role.USER, content=user_message)])
if intent.tool_name == "sales_tool":
route_to_sales_team(intent.tool_args)
elif intent.tool_name == "support_tool":
create_support_ticket(intent.tool_args)
else:
forward_to_billing(intent.tool_args)
AgentResult Inspection for Analytics
result = agent.run(messages)
if result.tool_calls:
for tc in result.tool_calls:
log_tool_usage(tc.tool_name, tc.parameters)
if result.iterations > 3:
alert_complex_conversation()
System Prompt for Domain Experts
config = AgentConfig(
system_prompt="You are a Python expert. Prefer type hints and modern syntax. Suggest tests when relevant.",
max_iterations=5,
)
agent = Agent(tools=[search_docs, run_code], provider=provider, config=config)
Best Practices
1. Use astream() for Responsive UX
async for item in agent.astream(messages):
if isinstance(item, StreamChunk):
await websocket.send_json({"type": "chunk", "content": item.content})
elif isinstance(item, AgentResult):
await websocket.send_json({"type": "done", "iterations": item.iterations})
2. Keep parallel_tool_execution Enabled
Default is True; disable only when tool ordering or side effects require sequential execution.
3. Prefer routing_only for Classification
Use routing mode for cheap classification instead of a full agent run.
4. Reuse Agents with reset()
agent = Agent(...)
for req in queue:
agent.reset()
result = agent.run(req)
5. Use AgentResult for Observability
Use result.tool_calls and result.iterations for logging and monitoring.
Troubleshooting
Streaming Yields Nothing Until Complete
Cause: Provider lacks astream(); agent falls back to acomplete() and yields a single chunk.
Fix: Use a provider that implements astream() (e.g. OpenAI, Anthropic, Gemini).
Parallel Tools Seem Sequential
Cause: parallel_tool_execution=False or only one tool per response.
Fix: Set AgentConfig(parallel_tool_execution=True) and use prompts that trigger multiple tools.
Context Lost in Sync Tools
Cause: Older selectools versions or custom executor usage without context propagation.
Fix: Upgrade to v0.10.0+; sync tools from async agent should receive proper context propagation.
routing_only Still Executes Tools
Cause: Misconfiguration or different code path.
Fix: Ensure AgentConfig(routing_only=True) is passed to Agent, not just AgentConfig().
Multimodal Messages (v0.21.0)
Stability: beta
Selectools supports multimodal messages through the ContentPart, image_message(), and text_content() helpers. These let you send images alongside text to vision-capable models.
ContentPart
from selectools.types import ContentPart, Message, Role
# Build a message with multiple content parts
parts = [
ContentPart(type="text", text="What's in these images?"),
ContentPart(type="image_url", image_url="https://example.com/photo.jpg"),
ContentPart(type="image_base64", image_base64="...", media_type="image/png"),
]
msg = Message(role=Role.USER, content="", content_parts=parts)
image_message() Helper
from selectools.types import image_message
# From a URL
msg = image_message("https://example.com/photo.jpg", prompt="Describe this image.")
# From a local file path
msg = image_message("/path/to/photo.png", prompt="What do you see?")
text_content() Helper
Extract text from a message regardless of format:
from selectools.types import text_content
# Works with both plain content and content_parts messages
text = text_content(message)
Provider Handling
When streaming with astream(), providers serialize content_parts into their native multimodal format (OpenAI content arrays, Anthropic content blocks, Gemini parts). The streaming pipeline handles content parts transparently -- text deltas and tool calls are yielded as usual.
Further Reading
- Agent Module - Agent lifecycle, hooks, configuration
- Tools Module - Tool definition and validation
- Providers Module - Provider implementations and streaming
- Memory Module - Conversation memory and
reset()
Next Steps: Enable streaming with agent.astream() and optimize tool-heavy workflows with parallel_tool_execution=True.
Related Examples
| # | Script | Description |
|---|---|---|
| 07 | 07_streaming_tools.py | Token-level async streaming with tool call support |
| 08 | 08_streaming_parallel.py | Streaming with parallel tool execution |