amplifier-core Contracts

April 25, 2026 · View on GitHub

Purpose: This document is the authoritative Rust↔Python type mapping for coding agents working on either side of the boundary. Read this before modifying any shared type, trait/protocol, or error.

Naming Convention

ConceptRustPython
Data modelstruct Foo with #[derive(Serialize, Deserialize)]class Foo(BaseModel) (Pydantic v2)
Interfacetrait Bar (async, dyn-safe)class Bar(Protocol) (structural typing)
Enum (string)enum Baz { Variant } with #[serde(rename_all = "snake_case")]Literal["variant"]
Tagged unionenum E { A { .. }, B { .. } } with #[serde(tag = "type")]Discriminated Union[A, B]
ErrorResult<T, E> with thiserrorT (raises exception)
OptionalOption<T>T | None
ListVec<T>list[T]
MapHashMap<K, V>dict[K, V]
JSON blobserde_json::Valuedict[str, Any]

Serialization boundary: All data crosses the PyO3 bridge as JSON (via serde_json::to_stringjson.loads and vice versa). Field names must be identical on both sides. Rust uses #[serde(rename = "...")] where the Rust field name differs from the JSON key.


Trait ↔ Protocol Mapping

Rust TraitLocation (Rust)Python ProtocolLocation (Python)Notes
Toolcrates/amplifier-core/src/traits.rsToolinterfaces.pyRust execute takes Value; Python takes dict[str, Any]. Rust adds get_spec() -> ToolSpec.
Providercrates/amplifier-core/src/traits.rsProviderinterfaces.py1:1 — name, get_info, list_models, complete, parse_tool_calls.
Orchestratorcrates/amplifier-core/src/traits.rsOrchestratorinterfaces.pyRust passes hooks/coordinator as Value; Python passes typed objects + **kwargs.
ContextManagercrates/amplifier-core/src/traits.rsContextManagerinterfaces.py1:1 — add_message, get_messages_for_request, get_messages, set_messages, clear.
HookHandlercrates/amplifier-core/src/traits.rsHookHandlerinterfaces.pyRust: handle(event, data); Python: __call__(event, data).
ApprovalProvidercrates/amplifier-core/src/traits.rsApprovalProviderinterfaces.py1:1 — request_approval(ApprovalRequest) -> ApprovalResponse.

Data Model Mapping

Core Models (models.rsmodels.py)

Rust Struct/EnumPython ClassSerializationNotes
HookResultHookResult (BaseModel)JSON round-trip at PyO3 boundaryField-for-field match. Rust HookAction enum ↔ Python Literal strings.
HookActionLiteral["continue","deny","modify","inject_context","ask_user"]serde(rename_all = "snake_case")Enum variants map 1:1 to string literals.
ToolResultToolResult (BaseModel)JSON round-trip1:1. Python adds __str__() and get_serialized_output() convenience methods.
ModelInfoModelInfo (BaseModel)JSON round-trip1:1.
ConfigFieldConfigField (BaseModel)JSON round-tripRust default_value ↔ Python default.
ConfigFieldTypeLiteral["text","secret","choice","boolean"]snake_case
ProviderInfoProviderInfo (BaseModel)JSON round-trip1:1.
ModuleInfoModuleInfo (BaseModel)JSON round-tripRust module_type serializes as JSON key "type".
ModuleTypeLiteral["orchestrator","provider","tool","context","hook","resolver"]snake_case
SessionStatusSessionStatus (BaseModel)JSON round-trip1:1. Rust started_at is String; Python is datetime.
SessionStateLiteral["running","completed","failed","cancelled"]snake_case
ContextInjectionRoleLiteral["system","user","assistant"]snake_case
ApprovalDefaultLiteral["allow","deny"]snake_case
UserMessageLevelLiteral["info","warning","error"]snake_case
ApprovalRequestApprovalRequest (BaseModel)JSON round-trip1:1. Python has model_post_init validation.
ApprovalResponseApprovalResponse (BaseModel)JSON round-trip1:1.

Message Models (messages.rsmessage_models.py)

Rust TypePython TypeSerializationNotes
MessageMessage (BaseModel)JSON round-trip1:1 fields.
ContentBlock (tagged enum)ContentBlockUnion (discriminated Union)serde(tag = "type")Rust variants = Python separate BaseModel classes.
ToolSpecToolSpec (BaseModel)JSON round-trip1:1.
ChatRequestChatRequest (BaseModel)JSON round-trip1:1.
ToolCallToolCall (BaseModel)JSON round-trip1:1.
UsageUsage (BaseModel)JSON round-trip1:1.
DegradationDegradation (BaseModel)JSON round-trip1:1.
ChatResponseChatResponse (BaseModel)JSON round-trip1:1.
ResponseFormat (tagged enum)ResponseFormat (Union)serde(tag = "type")Text/Json/JsonSchema variants match.
Role (enum)Literal["system","developer","user","assistant","function","tool"]snake_case
Visibility (enum)Literal["internal","developer","user"]snake_case

Content Block Variants

Python uses separate classes for each content block type. Rust uses variants of the ContentBlock enum.

Rust VariantPython ClassLocation (Python)
ContentBlock::TextTextBlockmessage_models.py
ContentBlock::ThinkingThinkingBlockmessage_models.py
ContentBlock::RedactedThinkingRedactedThinkingBlockmessage_models.py
ContentBlock::ToolCallToolCallBlockmessage_models.py
ContentBlock::ToolResultToolResultBlockmessage_models.py
ContentBlock::ImageImageBlockmessage_models.py
ContentBlock::ReasoningReasoningBlockmessage_models.py

Streaming Content Models (content_models.py)

Rust EquivalentPython ClassNotes
ContentBlock::Text variantTextContent (dataclass)
ContentBlock::Thinking variantThinkingContent (dataclass)
ContentBlock::ToolCall variantToolCallContent (dataclass)
ContentBlock::ToolResult variantToolResultContent (dataclass)
ContentBlockType enumContentBlockType (str Enum)Text/Thinking/ToolCall/ToolResult

Behavioral Type Mapping

These are the core engine types that the Rust kernel implements and the PyO3 bridge exposes.

Rust TypePyO3 WrapperPython NamePython OriginalNotes
SessionRustSessionAmplifierSessionsession.py:AmplifierSessionRust is the default export. Rust is leaner: no ModuleLoader, no auto-load in initialize().
CoordinatorRustCoordinatorModuleCoordinatorcoordinator.py:ModuleCoordinatorRust is the default export. Rust has core mount/get/hooks/cancel. Python adds process_hook_result, session back-refs, budget limits.
HookRegistryRustHookRegistryHookRegistryhooks.py:HookRegistryRust is the default export. 1:1 core API: register, emit, unregister, list_handlers.
CancellationTokenRustCancellationTokenCancellationTokencancellation.py:CancellationTokenRust is the default export. 1:1: state, is_cancelled, request_graceful, request_immediate, reset.
CancellationState(stays Python)CancellationStatecancellation.py:CancellationStateSimple enum — no Rust bridge needed.
SessionConfig(internal)(dict)(inline in __init__)Rust-specific typed config. Python uses raw dict.

The switchover from Python to Rust implementations is complete. Rust types are now the default exports for top-level imports (from amplifier_core import ...). Python implementations remain accessible via submodule imports for backward compatibility. The Rust* prefixed names (RustSession, RustHookRegistry, etc.) are still available as explicit aliases.


Error Mapping

LLM/Provider Errors (errors.rs:ProviderErrorllm_errors.py)

Rust VariantPython ExceptionNotes
ProviderError::RateLimitRateLimitErrorRust has retry_after: Option<f64> field.
ProviderError::AuthenticationAuthenticationError
ProviderError::ContextLengthContextLengthError
ProviderError::ContentFilterContentFilterError
ProviderError::InvalidRequestInvalidRequestError
ProviderError::UnavailableProviderUnavailableError
ProviderError::TimeoutLLMTimeoutError
ProviderError::OtherLLMError (base class)Catch-all.

Session Errors (errors.rs:SessionError)

Rust VariantPython EquivalentNotes
SessionError::NotInitializedRuntimeError("No orchestrator...")Python raises generic RuntimeError.
SessionError::ConfigMissingValueError("Configuration must specify...")
SessionError::AlreadyCompleted(no equivalent)Rust-only guard.
SessionError::Other(various RuntimeErrors)

Hook Errors (errors.rs:HookError)

Rust VariantPython EquivalentNotes
HookError::HandlerFailed(logged, not raised)Python catches silently.
HookError::TimeoutTimeoutError (via asyncio.wait_for)
HookError::Other(generic Exception)

Rust-Only Error Types

Rust TypeNotes
AmplifierErrorTop-level wrapper enum. Python has no unified equivalent.
ToolErrorPython represents tool errors as ToolResult(success=False, error={...}).
ContextErrorPython context managers raise generic exceptions.

Python-Only Types (Not Ported to Rust)

These types stay as Python by design — they are app-layer concerns, not kernel.

Python TypeLocationWhy Not Ported
ModuleLoaderloader.pyModule loading/discovery is app-layer.
ModuleValidationErrorloader.pyValidation framework stays Python.
ApprovalSystemapproval.pyApp-layer approval policy.
DisplaySystemdisplay.pyApp-layer UX.
validation/ packagevalidation/Structural + behavioral test framework stays Python.
testing moduletesting.pyTest utilities (MockTool, TestCoordinator, etc.) stay Python.
pytest_pluginpytest_plugin.pyPytest integration stays Python.
clicli.pyCLI entry point stays Python.

Module Lifecycle Methods

Modules may expose lifecycle methods that the kernel calls at specific points during initialisation.

mount(coordinator, config) — Required

from collections.abc import Awaitable, Callable
from typing import Any

async def mount(coordinator, config: dict[str, Any] | None = None) -> Callable[[], None | Awaitable[None]] | None:
    ...

Module-level free function — no self. Called once per module, in phase order, when the kernel loads and wires the module into the coordinator. At call time the coordinator is partially composed — modules from earlier phases are accessible, but later-phase modules may not yet be present.

Return value — optional cleanup callable:

mount() may return a zero-argument cleanup callable, or None (no cleanup needed).

Returned valueKernel behaviour
NoneSilently ignored — no cleanup registered
Non-callable (e.g. dict)Silently ignoredregister_cleanup only stores callables
Sync callableStored; called with no args at teardown. If the call returns a coroutine it is awaited
Async callableStored; at teardown the coroutine is obtained and awaited via pyo3_async_runtimes::tokio::into_future

The canonical form is an async def cleanup() nested closure:

async def mount(coordinator, config: dict[str, Any] | None = None):
    client = await MyClient.connect((config or {}).get("url"))
    coordinator.register_capability("my.client", client)

    async def cleanup() -> None:
        await client.aclose()

    return cleanup  # kernel awaits this at session teardown

Cleanup callables are called with no arguments, in reverse registration order (last-mounted module is cleaned up first). Errors in one cleanup callable are logged but do not prevent the remaining callables from running.

Source reference: bindings/python/src/coordinator/mod.rs::cleanup() and bindings/python/src/coordinator/capabilities.rs::register_cleanup().

on_session_ready(coordinator) — Optional

async def on_session_ready(coordinator) -> None:
    ...

Called after ALL modules across all phases have completed mount(). By the time on_session_ready() runs, the coordinator is fully composed and every contributed tool, hook, and provider is registered.

on_session_ready() Contract

PropertyValue
PresenceOptional
Signatureasync def on_session_ready(coordinator) -> None — no config param, no self
Sync checkMust be async; a sync on_session_ready logs a warning and is skipped
Return valueIgnored
ExceptionsNon-fatal — caught and logged as warnings with exc_info=True
Call orderSame as mount() order (phase order, then registration order within a phase)
TimingAfter all phases complete mount(), before session:fork is emitted
ScopePython-only — see polyglot note below

Fork behaviour: on_session_ready fires once per session. When a parent session forks a child session (the session:fork event), the child runs its own independent mount wave and its own on_session_ready pass — in the same order as its own module registration. A parent's on_session_ready callbacks do not run again for the child.

Dispatch ordering: Callbacks fire sequentially in module registration order (orchestrator first, then context, then providers, tools, and hooks within each phase in load order). This ordering is stable and guaranteed. Cross-module assumptions built on this ordering are safe.

Failure isolation: A raised exception in one module's on_session_ready is caught, logged as a WARNING with exc_info=True, and does not prevent remaining modules' callbacks from running.

No timeout: The kernel enforces no timeout on on_session_ready callbacks. A hanging callback hangs the session. This is a deliberate absence — documenting it now prevents surprise if a timeout is added later (adding one is a breaking change).

Observability event: On on_session_ready failure, the kernel emits a module:on_session_ready_failed event with payload {"module_id": str, "error": str}, in addition to the WARNING log. Log-only failures are invisible to observability hooks.

When to use on_session_ready()

  • Discovering contributions from other modules — read the fully-composed coordinator to inspect tools, hooks, or providers registered by peers.
  • Wiring cross-module dependencies — subscribe to hooks or capabilities that are only present after another module mounts.
  • Eliminating dual-path patterns — avoid the anti-pattern of checking for a capability in both mount() (not yet available) and in a request handler (too late to configure).

Before on_session_ready (dual-path anti-pattern) — process-scoped global, unsafe for multi-session deployments:

# Before (anti-pattern) — process-scoped global, unsafe for multi-session deployments
_coordinator = None  # ← shared across ALL sessions in the process

async def mount(coordinator, config: dict | None = None) -> None:
    global _coordinator
    _coordinator = coordinator

async def handle_request(request):
    if _coordinator and _coordinator.get("tools", {}).get("search"):
        result = await _coordinator.get("tools")["search"].execute(**request.params)

After on_session_ready (correct) — session-scoped via closure, safe for multi-session deployments:

# After (correct) — session-scoped via closure, safe for multi-session deployments
async def mount(coordinator, config: dict | None = None) -> None:
    pass  # nothing cross-module to wire here

async def on_session_ready(coordinator) -> None:
    # Session-scoped: register_capability stores reference per-session on the coordinator
    tools = coordinator.get("tools") or {}
    search_tool = tools.get("search")
    if search_tool:
        coordinator.register_capability("my_module.search_tool", search_tool)

async def handle_request(coordinator, request):
    search_tool = coordinator.get_capability("my_module.search_tool")
    if search_tool:
        result = await search_tool.execute(**request.params)

Cleanup from on_session_ready(): on_session_ready()'s return value is ignored. If a resource allocated in on_session_ready() needs teardown, register the cleanup directly: coordinator.register_cleanup(my_async_cleanup_fn).

Canonical use case: event subscription after full composition

# Canonical on_session_ready pattern: event discovery + subscription
# (from hooks-logging, the primary motivating adopter)

async def mount(coordinator, config: dict | None = None):
    pass  # store config-derived state if needed

async def on_session_ready(coordinator) -> None:
    # All modules mounted — event contributions are complete
    all_events = set(ALL_EVENTS)  # core canonical events
    contributions = await coordinator.collect_contributions("observability.events")
    for event_list in contributions:
        all_events.update(event_list)
    # delegate:agent_spawned, delegate:agent_completed are now guaranteed present
    for event in all_events:
        coordinator.hooks.register(event, my_handler, name="my-module")

Polyglot note

on_session_ready() is Python-only. WASM, gRPC, and native Rust modules do not participate in the on_session_ready() wave. If a polyglot module needs post-composition behaviour, defer it to a request-time check or emit a custom event that Python modules can subscribe to.


Event Constants

Canonical event names are defined in crates/amplifier-core/src/events.rs (Rust) and re-exported in amplifier_core.events (Python). The full set contains 41+ events; the table below lists the most commonly referenced ones. Always use the constants from the events module rather than hard-coding strings.

EventValue
Execution start"execution:start"
Execution end"execution:end"
Session start"session:start"
Session end"session:end"
LLM request"llm:request"
LLM response"llm:response"
Tool pre"tool:pre"
Tool post"tool:post"
Context compaction"context:compaction"

Rules for Modifying Shared Types

  1. Field names must match. If you add a field to a Rust struct, add the identical field to the Python BaseModel (and vice versa).

  2. Enum variants must match. Rust snake_case serde names = Python Literal string values.

  3. JSON is the contract. Both sides must produce identical JSON for the same logical value. Test with round-trip serialization.

  4. Update this document. Any change to a shared type must be reflected here. CI will eventually enforce this.

  5. Method names must match for PyO3-bridged types (Session, Coordinator, HookRegistry, CancellationToken). The Python-visible name is set by #[pyclass(name = "...")] and #[pymethods].