Delivering live events from Weavegraph

May 24, 2026 ยท View on GitHub

Weavegraph exposes three convenience entry points on App and one shared subscription type, EventStream. Pick the helper based on where the events need to go: a flume receiver, a custom sink list, or a broadcast stream that feeds SSE or WebSocket code.

Which helper to reach for

If you need...Call this APIWhat you get backTypical fit
a quick receiver in a CLI or test harnessApp::invoke_with_channel(Result<VersionedState, RunnerError>, flume::Receiver<Event>)progress output, smoke tests, ad-hoc scripts
extra sinks in addition to the runtime defaultsApp::invoke_with_sinksResult<VersionedState, RunnerError>stdout + JSONL, memory capture, metrics fan-out
a background run plus a first-class subscriptionApp::invoke_streaming(InvocationHandle, EventStream)SSE, WebSocket, long-lived dashboards
total control over session lifetimeAppRunner::builder()your own runner and your own busiterative sessions, custom checkpointing, bespoke wiring

invoke_with_channel

invoke_with_channel is the smallest streaming surface. The method builds an EventBus from RuntimeConfig, appends a ChannelSink, starts the workflow, and hands the caller the paired receiver. Use it when a single consumer is enough and you do not need to manage EventStream directly.

use tokio::time::{timeout, Duration};
use weavegraph::state::VersionedState;
# async fn sample(app: weavegraph::app::App) -> Result<(), Box<dyn std::error::Error>> {
let seed = VersionedState::new_with_user_message("compile daily report");
let (result, rx) = app.invoke_with_channel(seed).await;
let collector = tokio::spawn(async move {
    while let Ok(event) = timeout(Duration::from_millis(250), rx.recv_async()).await.unwrap_or_else(|_| Err(flume::RecvError::Disconnected)) {
        println!("{} :: {}", event.scope_label().unwrap_or("unknown"), event.message());
    } // receiver loop
}); // spawned collector
let finished = result?;
collector.await?;
assert!(!finished.snapshot().messages.is_empty());
# Ok(()) }

A few practical notes:

  • The workflow still runs to completion even if the receiver is slow.
  • If the receiver disappears, ChannelSink reports a broken pipe and the bus keeps serving any remaining sinks.
  • The result half resolves to the final VersionedState, so this helper still works for one-shot command-line tools.

invoke_with_sinks

invoke_with_sinks keeps the sinks already described by RuntimeConfig.event_bus and appends the sinks you pass in. This is the helper to use when one invocation needs multiple outputs at once.

use weavegraph::event_bus::{ChannelSink, JsonLinesSink, StdOutSink};
use weavegraph::state::VersionedState;
# async fn sample(app: weavegraph::app::App) -> Result<(), Box<dyn std::error::Error>> {
let (tx, rx) = flume::unbounded();
let state = VersionedState::new_with_user_message("refresh cache");
let _final_state = app.invoke_with_sinks(
    state,
    vec![
        Box::new(StdOutSink::default()),
        Box::new(JsonLinesSink::to_stdout()),
        Box::new(ChannelSink::new(tx)),
    ],
).await?;
while let Ok(event) = rx.try_recv() {
    eprintln!("mirrored event: {}", event.message());
}
# Ok(()) }

This helper is useful when you want a plain-text operator feed, a structured log sink, and a receiver for application code without building AppRunner by hand.

invoke_streaming

invoke_streaming is the preferred interface for web transports. It creates a fresh EventBus, subscribes an EventStream, spawns the workflow in the background, and returns an InvocationHandle that can be awaited or aborted.

use futures_util::StreamExt as _;
use weavegraph::event_bus::{Event, STREAM_END_SCOPE};
use weavegraph::state::VersionedState;
# async fn sample(app: weavegraph::app::App) -> Result<(), Box<dyn std::error::Error>> {
let seed = VersionedState::new_with_user_message("stream this run");
let (handle, events) = app.invoke_streaming(seed).await;
let reader = tokio::spawn(async move {
    let mut feed = events.into_async_stream();
    loop {
        match feed.next().await {
            Some(Event::Diagnostic(marker)) if marker.scope() == STREAM_END_SCOPE => break,
            Some(event) => println!("event => {}", event.message()),
            None => break,
        } // next event branch
    } // async stream loop
}); // spawned reader
let final_state = handle.join().await?;
reader.await?;
assert!(final_state.snapshot().messages.len() >= 1);
# Ok(()) }

Two lifecycle rules matter here:

  • Dropping the InvocationHandle aborts the background workflow task.
  • Dropping only the EventStream does not stop the run; call abort() or join() on the handle when the client disconnects. The sentinel diagnostic with scope STREAM_END_SCOPE is the clean end-of-stream marker. The runtime emits it before closing the channel so HTTP code can flush a final frame and then terminate the connection intentionally.

EventStream operations

EventStream wraps a Tokio broadcast receiver and exposes several consumption styles.

MethodWhat it doesGood for
recv().awaitwait for the next event or a lag/closed errordirect async loops
try_recv()poll without blockingmanual event pumps
into_async_stream()produce a boxed futures stream and silently skip lagged slotsSSE or WebSocket adapters
into_blocking_iter()iterate from synchronous codeCLI bridges or thread-based tools
next_timeout(duration).awaitwait up to a deadline and skip lag noticesperiodic polling loops
with_shutdown(watch_rx)stop the stream when external shutdown flips to trueserver-managed cancellation
Lag handling is built into the hub rather than the consumer. When a receiver falls behind, the hub records dropped messages, logs a warning, and keeps the producer side moving.

Server-Sent Events shape

examples/production_streaming.rs shows the full Axum integration, but the essential flow is short: start the workflow, map each Event into an SSE frame, and stop when STREAM_END_SCOPE arrives.

use axum::response::sse::{Event as AxumSseFrame, Sse};
use futures_util::StreamExt as _;
use weavegraph::event_bus::{Event, STREAM_END_SCOPE};
# async fn sse(app: weavegraph::app::App, initial: weavegraph::state::VersionedState) {
let (handle, events) = app.invoke_streaming(initial).await;
let sse_stream = events.into_async_stream().map(|event| {
    let _closing_frame = matches!(&event, Event::Diagnostic(marker) if marker.scope() == STREAM_END_SCOPE);
    Ok::<_, std::convert::Infallible>(AxumSseFrame::default().json_data(event).expect("serialize event"))
}); // SSE frame mapper
let join_task = tokio::spawn(async move {
    match handle.join().await {
        Ok(_) => {}
        Err(err) => tracing::error!("workflow join failed: {err}"),
    } // join outcome
}); // spawned join task
let _ = join_task;
let _response = Sse::new(sse_stream);
# }

If you need to cancel when the client vanishes, keep the handle in shared state and call abort() from the disconnect path, just like the production example does.

Buffer sizing and diagnostics

The broadcast side is configured through RuntimeConfig and EventBusConfig.

use weavegraph::runtimes::{DiagnosticsConfig, EventBusConfig, RuntimeConfig, SinkConfig};
let runtime = RuntimeConfig::default().with_event_bus(
    EventBusConfig::new(2048, vec![SinkConfig::StdOut]).with_diagnostics(DiagnosticsConfig {
        enabled: true,
        buffer_capacity: Some(512),
        emit_to_events: false,
    })
);
let _ = runtime;

What these settings change:

  • buffer_capacity sets the size of the broadcast ring used by EventHub.
  • diagnostics capacity can match or diverge from the main event buffer.
  • emit_to_events controls whether sink failures stay isolated in DiagnosticsStream or are also mirrored into the primary event feed. At runtime you can inspect sink status with EventBus::sink_health() and subscribe to failure notifications with EventBus::diagnostics(). That is the supported way to watch sink health without polluting the normal client stream.

Event payload families

Every helper above ultimately delivers the same event_bus::Event enum.

  • Event::Node carries structured node output emitted through NodeContext::emit.
  • Event::Diagnostic carries framework markers, including STREAM_END_SCOPE and INVOCATION_END_SCOPE.
  • Event::LLM carries chunk, final, and error events produced by emit_llm_chunk, emit_llm_final, or emit_llm_error. Because every variant exposes scope_label() and message(), many consumers can stay generic and only branch when they need variant-specific metadata.

When to drop down to AppRunner

The convenience methods cover the common one-shot cases. Use AppRunner::builder() when you need any of the following:

  • an explicitly supplied EventBus,
  • a custom or preconnected Checkpointer,
  • iterative sessions driven by invoke_next,
  • a runner that survives across multiple logical inputs. That lower-level API is the same engine used underneath the convenience helpers, so moving down a level does not change event semantics; it only exposes more of the wiring.

Where to explore next