Chapter 6: Streaming & Tracing
March 21, 2026 · View on GitHub
In Chapter 5 you added safety guardrails to your agents. Now we make agents observable — both in real time (streaming) and after the fact (tracing). These are essential for production UIs and debugging.
Streaming Architecture
The Runner.run_streamed() method returns an async iterator of events. Each event describes something happening in the agentic loop — a model token, a tool call starting, a handoff occurring, etc.
flowchart LR
A[Runner.run_streamed] --> B[Event Stream]
B --> C[RawResponsesStreamEvent]
B --> D[RunItemStreamEvent]
B --> E[AgentUpdatedStreamEvent]
C --> C1[Model tokens]
D --> D1[Tool calls]
D --> D2[Tool results]
D --> D3[Messages]
E --> E1[Handoff occurred]
classDef source fill:#e1f5fe,stroke:#01579b
classDef event fill:#f3e5f5,stroke:#4a148c
classDef detail fill:#e8f5e8,stroke:#1b5e20
class A source
class B,C,D,E event
class C1,D1,D2,D3,E1 detail
Basic Streaming
from agents import Agent, Runner
import asyncio
agent = Agent(
name="Storyteller",
instructions="Tell creative short stories. Be vivid and engaging.",
)
async def stream_response():
result = Runner.run_streamed(agent, input="Tell me a story about a robot learning to paint.")
async for event in result.stream_events():
# RawResponsesStreamEvent contains model output tokens
if event.type == "raw_response_event" and hasattr(event.data, "delta"):
print(event.data.delta, end="", flush=True)
print() # Newline at end
# After streaming completes, the final result is available
final = result.final_output
print(f"\n[Handled by: {result.last_agent.name}]")
asyncio.run(stream_response())
Stream Event Types
The SDK emits three categories of events:
1. RawResponsesStreamEvent
Raw model output, including text deltas (tokens) as they arrive:
from agents.stream_events import RawResponsesStreamEvent
async for event in result.stream_events():
if isinstance(event, RawResponsesStreamEvent):
# event.data contains the raw streaming chunk from the model
# Useful for token-by-token display
pass
2. RunItemStreamEvent
Higher-level items: messages, tool calls, tool results, handoffs:
from agents.stream_events import RunItemStreamEvent
async for event in result.stream_events():
if isinstance(event, RunItemStreamEvent):
item = event.item
if item.type == "tool_call_item":
print(f"[Calling tool: {item.raw_item.name}]")
elif item.type == "tool_call_output_item":
print(f"[Tool result received]")
elif item.type == "message_output_item":
print(f"[Agent message]")
3. AgentUpdatedStreamEvent
Fired when the current agent changes (due to a handoff):
from agents.stream_events import AgentUpdatedStreamEvent
async for event in result.stream_events():
if isinstance(event, AgentUpdatedStreamEvent):
print(f"[Handoff: now talking to {event.new_agent.name}]")
Building a Chat UI with Streaming
Here is a complete example suitable for a terminal chat or web socket relay:
from agents import Agent, Runner
from agents.stream_events import (
RawResponsesStreamEvent,
RunItemStreamEvent,
AgentUpdatedStreamEvent,
)
import asyncio
support_agent = Agent(
name="Support",
instructions="Help users with their questions. Be thorough.",
)
async def chat_stream(user_message: str):
result = Runner.run_streamed(support_agent, input=user_message)
current_agent = support_agent.name
async for event in result.stream_events():
if isinstance(event, AgentUpdatedStreamEvent):
current_agent = event.new_agent.name
yield {"type": "agent_change", "agent": current_agent}
elif isinstance(event, RunItemStreamEvent):
if event.item.type == "tool_call_item":
yield {"type": "tool_start", "tool": event.item.raw_item.name}
elif event.item.type == "tool_call_output_item":
yield {"type": "tool_end"}
elif isinstance(event, RawResponsesStreamEvent):
if hasattr(event.data, "delta"):
yield {"type": "token", "text": event.data.delta}
yield {"type": "done", "agent": result.last_agent.name}
# Usage
async def main():
async for chunk in chat_stream("How do I reset my password?"):
if chunk["type"] == "token":
print(chunk["text"], end="", flush=True)
elif chunk["type"] == "agent_change":
print(f"\n[Transferred to {chunk['agent']}]")
elif chunk["type"] == "tool_start":
print(f"\n[Using tool: {chunk['tool']}]", end="")
elif chunk["type"] == "done":
print(f"\n[Completed by {chunk['agent']}]")
asyncio.run(main())
Tracing
Every call to Runner.run() or Runner.run_streamed() is automatically traced. Traces capture the full execution timeline: model calls, tool executions, handoffs, guardrail checks, and their durations.
Tracing Architecture
flowchart TD
A[Runner.run] --> B[Trace Created]
B --> C[Agent Span]
C --> D[Model Call Span]
C --> E[Tool Execution Span]
C --> F[Guardrail Span]
C --> G[Handoff Span]
G --> H[New Agent Span]
H --> I[Model Call Span]
B --> J[Trace Exported]
J --> K[OpenAI Dashboard]
J --> L[Custom Processor]
J --> M[Console Output]
classDef trace fill:#e1f5fe,stroke:#01579b
classDef span fill:#f3e5f5,stroke:#4a148c
classDef export fill:#e8f5e8,stroke:#1b5e20
class A,B trace
class C,D,E,F,G,H,I span
class J,K,L,M export
Viewing Traces in the OpenAI Dashboard
By default, traces are sent to the OpenAI platform and visible in your dashboard at platform.openai.com. No configuration needed — if your OPENAI_API_KEY is set, tracing works automatically.
Trace Configuration
from agents import Agent, Runner, trace
import asyncio
agent = Agent(name="Helper", instructions="Be helpful.")
async def traced_run():
# Custom trace name and metadata
with trace(
workflow_name="customer_support",
trace_id=None, # Auto-generated if None
group_id="session_abc123", # Group related traces
metadata={"user_id": "u_42", "channel": "web"},
disabled=False, # Set True to disable tracing
):
result = await Runner.run(agent, input="Hello")
print(result.final_output)
asyncio.run(traced_run())
Disabling Tracing
from agents import set_tracing_disabled
# Globally disable tracing (e.g., in tests)
set_tracing_disabled(True)
# Or per-run with the trace context manager
with trace(disabled=True):
result = await Runner.run(agent, input="Hello")
Custom Trace Processors
Send traces to your own observability stack:
from agents.tracing import TracingProcessor, Span, Trace
class DatadogProcessor(TracingProcessor):
"""Send trace data to Datadog APM."""
def on_trace_start(self, trace: Trace) -> None:
# Start a Datadog trace
print(f"[Datadog] Trace started: {trace.trace_id}")
def on_span_start(self, span: Span) -> None:
print(f"[Datadog] Span started: {span.span_id} ({span.span_type})")
def on_span_end(self, span: Span) -> None:
duration = span.ended_at - span.started_at
print(f"[Datadog] Span ended: {span.span_id} ({duration:.2f}s)")
def on_trace_end(self, trace: Trace) -> None:
print(f"[Datadog] Trace complete: {trace.trace_id}")
# Register custom processor
from agents.tracing import add_trace_processor
add_trace_processor(DatadogProcessor())
Multi-Backend Tracing
from agents.tracing import add_trace_processor
# Traces go to all registered processors + OpenAI dashboard
add_trace_processor(DatadogProcessor())
add_trace_processor(PrometheusProcessor())
add_trace_processor(FileLogProcessor(path="/var/log/agents/traces.jsonl"))
Custom Spans
Add your own spans to annotate business logic within a trace:
from agents import Agent, Runner, function_tool
from agents.tracing import custom_span
import asyncio
@function_tool
async def process_document(document_id: str) -> str:
"""Process a document by ID.
Args:
document_id: The ID of the document to process.
"""
with custom_span(
name="document_processing",
data={"document_id": document_id},
):
# Your processing logic
await asyncio.sleep(0.5) # Simulate work
return f"Document {document_id} processed successfully."
agent = Agent(
name="Doc Processor",
instructions="Process documents when asked.",
tools=[process_document],
)
Combining Streaming and Tracing
Streaming and tracing work together seamlessly:
from agents import Agent, Runner, trace
import asyncio
agent = Agent(
name="Assistant",
instructions="Be helpful and thorough.",
)
async def traced_stream():
with trace(workflow_name="chat_session", group_id="session_123"):
result = Runner.run_streamed(agent, input="Explain quantum computing.")
token_count = 0
async for event in result.stream_events():
if event.type == "raw_response_event" and hasattr(event.data, "delta"):
print(event.data.delta, end="", flush=True)
token_count += 1
print(f"\n\n[Tokens streamed: {token_count}]")
# Trace is automatically captured with streaming metadata
asyncio.run(traced_stream())
Debugging with Traces
When something goes wrong, traces show you exactly where:
from agents import Agent, Runner, function_tool
from agents.exceptions import MaxTurnsExceeded
import asyncio
@function_tool
def flaky_tool(query: str) -> str:
"""A tool that sometimes fails.
Args:
query: The search query.
"""
import random
if random.random() < 0.5:
raise ValueError("Service temporarily unavailable")
return f"Results for: {query}"
agent = Agent(
name="Debuggable Agent",
instructions="Search for information. If a tool fails, try again.",
tools=[flaky_tool],
)
async def debug_run():
try:
result = await Runner.run(agent, input="Search for AI news", max_turns=5)
print(result.final_output)
except MaxTurnsExceeded:
print("Agent exceeded max turns — check trace for tool failure pattern")
# Check the OpenAI dashboard for the full trace timeline
asyncio.run(debug_run())
What We've Accomplished
- Implemented real-time streaming with
Runner.run_streamed()and event handlers - Understood the three event types: raw responses, run items, and agent updates
- Built a chat-UI-ready streaming handler with tool and handoff indicators
- Explored automatic tracing and the OpenAI dashboard
- Configured custom trace processors for Datadog, Prometheus, or file logging
- Added custom spans to annotate business logic within traces
- Combined streaming and tracing for full production observability
Next Steps
Now you have observable, safe, streaming agents. In Chapter 7: Multi-Agent Patterns, we'll put everything together into proven architectural patterns: orchestrator-worker, pipeline, parallel fan-out, and more.
Source Walkthrough
src/agents/stream_events.py— Stream event typessrc/agents/run.py— run_streamed implementationsrc/agents/tracing/— Tracing infrastructure