Streaming and Performance Module

June 14, 2026 · View on GitHub

Import: from selectools import Agent

Stability: stable

import asyncio
from selectools import Agent, AgentConfig, Message, Role, tool
from selectools.providers.stubs import LocalProvider

@tool(description="Search the web")
def search(query: str) -> str:
    return f"Results for '{query}': Python is a popular programming language."

provider = LocalProvider()
agent = Agent(
    tools=[search],
    provider=provider,
    config=AgentConfig(max_iterations=2),
)


async def main():
    async for item in agent.astream(
        [Message(role=Role.USER, content="Search for Python tutorials")]
    ):
        # item is either StreamChunk (text) or AgentResult (final)
        print(type(item).__name__, getattr(item, "content", "")[:80])


asyncio.run(main())

!!! tip "See Also" - Agent - Agent lifecycle, hooks, and configuration - Providers - Provider implementations and streaming support


Directory: src/selectools/agent/ Key Types: StreamChunk, AgentResult (from selectools.types)

Table of Contents

  1. Overview
  2. Quick Start
  3. E2E Streaming (v0.11.0)
  4. Parallel Tool Execution (v0.11.0)
  5. Native Function Calling (v0.10.0)
  6. Routing Mode (v0.10.0)
  7. Context Propagation (v0.10.0)
  8. AgentResult (v0.9.0)
  9. Custom System Prompt (v0.9.0)
  10. Agent.reset() (v0.9.0)
  11. Performance Comparison
  12. Practical Examples
  13. Best Practices
  14. Troubleshooting
  15. Further Reading

Overview

The selectools library provides a rich set of streaming and performance features that enable real-time token delivery, concurrent tool execution, and programmatic inspection of agent behavior. These capabilities span from token-level streaming (astream) to routing without execution (routing_only), from native function calling to context-preserving tool execution.

Feature Summary

FeatureVersionPurpose
E2E Streamingv0.11.0Token-by-token output with native tool call support
Parallel Tool Executionv0.11.0Run multiple tools concurrently in a single iteration
Native Function Callingv0.10.0Provider-native tool APIs, no regex parsing
Routing Modev0.10.0Select a tool without executing it (classification, intent routing)
Context Propagationv0.10.0Preserve tracing and auth when running sync tools in executors
AgentResultv0.9.0Structured return with message, tool metadata, iterations
Custom System Promptv0.9.0Inject domain instructions via AgentConfig
Agent.reset()v0.9.0Clear state for clean reuse across requests

Import Paths

from selectools import Agent, AgentConfig, Message, Role
from selectools.types import StreamChunk, AgentResult

Quick Start

Streaming with astream()

import asyncio
from selectools import Agent, AgentConfig, Message, Role, OpenAIProvider
from selectools.types import StreamChunk, AgentResult

agent = Agent(
    tools=[search_tool],
    provider=OpenAIProvider(),
    config=AgentConfig(max_iterations=3),
)


async def main():
    async for item in agent.astream([Message(role=Role.USER, content="Search for Python tutorials")]):
        if isinstance(item, StreamChunk):
            print(item.content, end="", flush=True)
        elif isinstance(item, AgentResult):
            print(f"\n\nDone in {item.iterations} iterations")
            if item.tool_calls:
                print(f"Tools used: {[tc.tool_name for tc in item.tool_calls]}")


asyncio.run(main())

E2E Streaming (v0.11.0)

Agent.astream()

Agent.astream(messages) returns an AsyncGenerator yielding Union[StreamChunk, AgentResult]:

  • StreamChunk — Intermediate content chunks (text and/or tool calls)
  • AgentResult — Final result, yielded once when the agent completes

Accumulate deltas, not the final result. The terminal AgentResult.content is the whole answer, not a delta. Always discriminate by type — a naive text += item.content over every yielded item appends the full answer on top of the deltas and gives you the response twice:

text = ""
async for item in agent.astream("..."):
    if isinstance(item, StreamChunk):
        text += item.content or ""   # deltas only
    else:
        result = item                # AgentResult: full answer + metadata

StreamChunk

@dataclass
class StreamChunk:
    content: str = ""                              # Text delta
    role: Role = Role.ASSISTANT
    tool_calls: Optional[List[ToolCall]] = None    # Optional; present when chunk contains tool invocations
  • content: The text portion of this chunk
  • tool_calls: Optional list of ToolCall objects when the LLM emits tool invocations during streaming

AgentResult as Final Item

The last item yielded by astream() is always an AgentResult. It carries:

  • message — Final assistant response
  • tool_name — Last tool called (or None)
  • tool_args — Args for last tool
  • iterations — Number of loop iterations
  • tool_calls — All ToolCall objects from the run

Provider Protocol

Providers implement astream() yielding Union[str, ToolCall]:

  • Text deltas — Raw str chunks (token-by-token)
  • Tool calls — Complete ToolCall objects when ready (native function calling)
graph LR
    P["Provider.astream()"] --> A["yield 'Hello' (str)"]
    P --> B["yield ' ' (str)"]
    P --> C["yield 'world' (str)"]
    P --> D["yield ToolCall(...) (tool invocation)"]
    P --> E["yield '!' (str)"]

Fallback Chain

When a provider does not support streaming:

flowchart TD
    A["astream() requested"] --> B{"Provider has astream()?"}
    B -- Yes --> C["Use it"]
    B -- No --> D{"Provider has acomplete()?"}
    D -- Yes --> E["Call it, yield full response\nas single StreamChunk"]
    D -- No --> F["Run complete() in\nThreadPoolExecutor"]

Tool Call Accumulation and Multi-Iteration

  1. Accumulation: Tool calls are accumulated as they stream in from the provider.
  2. Execution: When all tool calls in a response are ready, they are executed (in parallel if parallel_tool_execution=True).
  3. Continue: Results are appended to history; streaming continues with the next LLM call.
  4. Final result: When the LLM produces a final text response with no tool calls, AgentResult is yielded.
graph TD
    subgraph Iteration1["Iteration 1"]
        A1["StreamChunk('Searching...')"] --> A2["StreamChunk(tool_calls=[...])"]
        A2 --> A3["Tools executed"]
    end
    subgraph Iteration2["Iteration 2"]
        B1["StreamChunk('Here are the results:')"] --> B2["StreamChunk('- Result 1')"]
        B2 --> B3["..."]
        B3 --> B4["AgentResult(iterations=2, tool_calls=[...])"]
    end
    Iteration1 --> Iteration2

Parallel Tool Execution (v0.11.0)

Overview

When the LLM requests multiple tool calls in a single response (common with native function calling), the agent executes them concurrently instead of sequentially.

Configuration

config = AgentConfig(
    parallel_tool_execution=True  # Default: enabled
)

Set to False for strictly sequential execution.

Async Execution

Uses asyncio.gather() for concurrent tool runs:

results = await asyncio.gather(*[run_tool(tc) for tc in tool_calls])

Sync Execution

Uses ThreadPoolExecutor with one worker per tool:

with ThreadPoolExecutor(max_workers=len(tool_calls)) as pool:
    futures = [pool.submit(run_tool, tc) for tc in tool_calls]
    results = [f.result() for f in futures]

Guarantees

GuaranteeDescription
Result orderingResults appended to history in original request order
Error isolationOne tool failure does not block others
Hook invocationon_tool_start, on_tool_end, on_tool_error fire for every tool
Single-tool optimizationOnly one tool called → sequential path, no executor overhead

Native Function Calling (v0.10.0)

Overview

selectools uses provider-native tool APIs instead of regex parsing:

  • OpenAIfunctions / tool_use in chat completions
  • Anthropictool_use blocks
  • Geminifunction_calling in responses

Message.tool_calls

Responses carry structured ToolCall objects on Message.tool_calls:

response = provider.complete(...)
msg = response[0]

if msg.tool_calls:
    for tc in msg.tool_calls:
        print(f"Tool: {tc.tool_name}, Args: {tc.parameters}")

No Regex Parsing

  • Providers return ToolCall objects directly.
  • No text-based patterns such as TOOL_CALL {...}.

Fallback

When a provider returns plain text only (no native tool format), the agent falls back to ToolCallParser regex parsing.


Routing Mode (v0.10.0)

Overview

AgentConfig(routing_only=True) makes the agent choose a tool but not run it. Useful for classification, intent routing, and tool selection.

Configuration

config = AgentConfig(routing_only=True)
agent = Agent(tools=[...], provider=provider, config=config)

Return Value

Returns AgentResult with:

  • tool_name — Selected tool
  • tool_args — Parsed arguments
  • message — Assistant message containing the selection
  • trace — Execution trace (LLM call + tool selection steps)

No tool execution; only one LLM call. Observer events on_iteration_start and on_iteration_end both fire for the single iteration, along with on_run_start/on_run_end.

Use Cases

Use CaseExample
ClassificationRoute to sales vs support vs billing
Intent detectionChoose between search, calculator, or Q&A
Tool preselectionDecide which tools to enable before full execution

Example

from selectools import Agent, AgentConfig, Message, Role, OpenAIProvider
from selectools.types import AgentResult

config = AgentConfig(routing_only=True)
agent = Agent(
    tools=[search_tool, calculator_tool, support_tool],
    provider=OpenAIProvider(),
    config=config,
)

result = agent.run([Message(role=Role.USER, content="I need help with my bill")])

# Inspect routing decision without executing
assert result.tool_name == "support_tool"
assert "billing" in str(result.tool_args).lower() or "bill" in str(result.tool_args).lower()

Context Propagation (v0.10.0)

Overview

When sync tools run inside a ThreadPoolExecutor (e.g. async agent calling sync tools), contextvars.copy_context() is used so request-scoped state (tracing, auth, etc.) is preserved.

How It Works

# In tools/base.py - sync tool execution from async context
context = contextvars.copy_context()
func_with_args = functools.partial(self.function, **call_args)
result = await loop.run_in_executor(executor, context.run, func_with_args)

Preserved State

  • OpenTelemetry tracing spans
  • Auth tokens
  • Request IDs
  • Other contextvars values

Async Tools

Async tools run in the same event loop as the agent; no executor, so context is already intact.


AgentResult (v0.9.0)

Overview

agent.run() and agent.arun() return AgentResult instead of Message, enabling programmatic inspection of tool usage and iterations.

Fields

FieldTypeDescription
messageMessageFinal assistant response
tool_nameOptional[str]Last tool called, or None
tool_argsDict[str, Any]Args for last tool call
iterationsintNumber of agent loop iterations
tool_callsList[ToolCall]All tool calls in order

Backward Compatibility

  • result.contentresult.message.content
  • result.roleresult.message.role

Example

result = agent.run([Message(role=Role.USER, content="What's the weather in Tokyo?")])

print(result.content)           # Final text
print(result.tool_name)         # e.g. "get_weather"
print(result.tool_args)         # e.g. {"location": "Tokyo"}
print(result.iterations)        # e.g. 2
print(len(result.tool_calls))   # Number of tools invoked

Custom System Prompt (v0.9.0)

Overview

AgentConfig(system_prompt="...") injects domain instructions before tool schemas. They persist across iterations.

Configuration

config = AgentConfig(
    system_prompt="You are a medical assistant. Only provide information you are confident about."
)
agent = Agent(tools=[...], provider=provider, config=config)

When to Use

  • Domain constraints (medical, legal, etc.)
  • Tone and persona
  • Guardrails and safety
  • Language or formatting rules

Example

config = AgentConfig(
    system_prompt="""You are a financial advisor.
    - Never guarantee returns.
    - Always recommend consulting a licensed professional.
    - Use clear, non-technical language."""
)
agent = Agent(tools=[lookup_stock, get_news], provider=provider, config=config)

Agent.reset() (v0.9.0)

Overview

Agent.reset() clears history, usage stats, analytics, and memory so the same agent instance can be reused across requests.

What It Clears

  • _history — Message history
  • usage — Token/cost stats
  • analytics — If enabled
  • memory — If ConversationMemory is set, calls memory.clear()

Pattern

agent = Agent(tools=[...], provider=provider, memory=ConversationMemory())

# Create once, reset between requests
for user_request in requests:
    agent.reset()
    result = agent.run([Message(role=Role.USER, content=user_request)])

Performance Comparison

Sequential vs Parallel Tool Execution

ScenarioSequentialParallelSpeedup
3 tools × 0.15s each~0.45s~0.15s~3×
5 tools × 0.2s each~1.0s~0.2s~5×
1 tool0.15s0.15s1× (no overhead)

Benchmark Example

import time
from selectools import Agent, AgentConfig, Message, Role, tool

@tool(description="Simulate slow API")
def slow_api(delay: float) -> str:
    time.sleep(delay)
    return f"Done after {delay}s"

agent_parallel = Agent(
    tools=[slow_api],
    provider=provider,
    config=AgentConfig(parallel_tool_execution=True, max_iterations=2),
)
agent_sequential = Agent(
    tools=[slow_api],
    provider=provider,
    config=AgentConfig(parallel_tool_execution=False, max_iterations=2),
)

# With a prompt that triggers 3 tool calls:
# parallel: ~0.15s
# sequential: ~0.45s

Practical Examples

Routing Mode for Intent Classification

config = AgentConfig(routing_only=True)
agent = Agent(
    tools=[sales_tool, support_tool, billing_tool],
    provider=provider,
    config=config,
)

intent = agent.run([Message(role=Role.USER, content=user_message)])
if intent.tool_name == "sales_tool":
    route_to_sales_team(intent.tool_args)
elif intent.tool_name == "support_tool":
    create_support_ticket(intent.tool_args)
else:
    forward_to_billing(intent.tool_args)

AgentResult Inspection for Analytics

result = agent.run(messages)

if result.tool_calls:
    for tc in result.tool_calls:
        log_tool_usage(tc.tool_name, tc.parameters)

if result.iterations > 3:
    alert_complex_conversation()

System Prompt for Domain Experts

config = AgentConfig(
    system_prompt="You are a Python expert. Prefer type hints and modern syntax. Suggest tests when relevant.",
    max_iterations=5,
)
agent = Agent(tools=[search_docs, run_code], provider=provider, config=config)

Best Practices

1. Use astream() for Responsive UX

async for item in agent.astream(messages):
    if isinstance(item, StreamChunk):
        await websocket.send_json({"type": "chunk", "content": item.content})
    elif isinstance(item, AgentResult):
        await websocket.send_json({"type": "done", "iterations": item.iterations})

2. Keep parallel_tool_execution Enabled

Default is True; disable only when tool ordering or side effects require sequential execution.

3. Prefer routing_only for Classification

Use routing mode for cheap classification instead of a full agent run.

4. Reuse Agents with reset()

agent = Agent(...)
for req in queue:
    agent.reset()
    result = agent.run(req)

5. Use AgentResult for Observability

Use result.tool_calls and result.iterations for logging and monitoring.


Troubleshooting

Streaming Yields Nothing Until Complete

Cause: Provider lacks astream(); agent falls back to acomplete() and yields a single chunk.

Fix: Use a provider that implements astream() (e.g. OpenAI, Anthropic, Gemini).

Parallel Tools Seem Sequential

Cause: parallel_tool_execution=False or only one tool per response.

Fix: Set AgentConfig(parallel_tool_execution=True) and use prompts that trigger multiple tools.

Context Lost in Sync Tools

Cause: Older selectools versions or custom executor usage without context propagation.

Fix: Upgrade to v0.10.0+; sync tools from async agent should receive proper context propagation.

routing_only Still Executes Tools

Cause: Misconfiguration or different code path.

Fix: Ensure AgentConfig(routing_only=True) is passed to Agent, not just AgentConfig().


Multimodal Messages (v0.21.0)

Stability: beta

Selectools supports multimodal messages through the ContentPart, image_message(), and text_content() helpers. These let you send images alongside text to vision-capable models.

ContentPart

from selectools.types import ContentPart, Message, Role

# Build a message with multiple content parts
parts = [
    ContentPart(type="text", text="What's in these images?"),
    ContentPart(type="image_url", image_url="https://example.com/photo.jpg"),
    ContentPart(type="image_base64", image_base64="...", media_type="image/png"),
]

msg = Message(role=Role.USER, content="", content_parts=parts)

image_message() Helper

from selectools.types import image_message

# From a URL
msg = image_message("https://example.com/photo.jpg", prompt="Describe this image.")

# From a local file path
msg = image_message("/path/to/photo.png", prompt="What do you see?")

text_content() Helper

Extract text from a message regardless of format:

from selectools.types import text_content

# Works with both plain content and content_parts messages
text = text_content(message)

Provider Handling

When streaming with astream(), providers serialize content_parts into their native multimodal format (OpenAI content arrays, Anthropic content blocks, Gemini parts). The streaming pipeline handles content parts transparently -- text deltas and tool calls are yielded as usual.


Further Reading


Next Steps: Enable streaming with agent.astream() and optimize tool-heavy workflows with parallel_tool_execution=True.


#ScriptDescription
0707_streaming_tools.pyToken-level async streaming with tool call support
0808_streaming_parallel.pyStreaming with parallel tool execution