Delivering live events from Weavegraph
May 24, 2026 ยท View on GitHub
- Use ARCHITECTURE.md for the larger runtime map behind these helpers.
- Use OPERATIONS.md when you need deployment, persistence, or troubleshooting guidance.
- Runnable samples live in examples/convenience_streaming.rs, examples/streaming_events.rs, and examples/production_streaming.rs.
Weavegraph exposes three convenience entry points on App and one shared subscription type, EventStream. Pick the helper based on where the events need to go: a flume receiver, a custom sink list, or a broadcast stream that feeds SSE or WebSocket code.
Which helper to reach for
| If you need... | Call this API | What you get back | Typical fit |
|---|---|---|---|
| a quick receiver in a CLI or test harness | App::invoke_with_channel | (Result<VersionedState, RunnerError>, flume::Receiver<Event>) | progress output, smoke tests, ad-hoc scripts |
| extra sinks in addition to the runtime defaults | App::invoke_with_sinks | Result<VersionedState, RunnerError> | stdout + JSONL, memory capture, metrics fan-out |
| a background run plus a first-class subscription | App::invoke_streaming | (InvocationHandle, EventStream) | SSE, WebSocket, long-lived dashboards |
| total control over session lifetime | AppRunner::builder() | your own runner and your own bus | iterative sessions, custom checkpointing, bespoke wiring |
invoke_with_channel
invoke_with_channel is the smallest streaming surface. The method builds an EventBus from RuntimeConfig, appends a ChannelSink, starts the workflow, and hands the caller the paired receiver.
Use it when a single consumer is enough and you do not need to manage EventStream directly.
use tokio::time::{timeout, Duration};
use weavegraph::state::VersionedState;
# async fn sample(app: weavegraph::app::App) -> Result<(), Box<dyn std::error::Error>> {
let seed = VersionedState::new_with_user_message("compile daily report");
let (result, rx) = app.invoke_with_channel(seed).await;
let collector = tokio::spawn(async move {
while let Ok(event) = timeout(Duration::from_millis(250), rx.recv_async()).await.unwrap_or_else(|_| Err(flume::RecvError::Disconnected)) {
println!("{} :: {}", event.scope_label().unwrap_or("unknown"), event.message());
} // receiver loop
}); // spawned collector
let finished = result?;
collector.await?;
assert!(!finished.snapshot().messages.is_empty());
# Ok(()) }
A few practical notes:
- The workflow still runs to completion even if the receiver is slow.
- If the receiver disappears,
ChannelSinkreports a broken pipe and the bus keeps serving any remaining sinks. - The result half resolves to the final
VersionedState, so this helper still works for one-shot command-line tools.
invoke_with_sinks
invoke_with_sinks keeps the sinks already described by RuntimeConfig.event_bus and appends the sinks you pass in. This is the helper to use when one invocation needs multiple outputs at once.
use weavegraph::event_bus::{ChannelSink, JsonLinesSink, StdOutSink};
use weavegraph::state::VersionedState;
# async fn sample(app: weavegraph::app::App) -> Result<(), Box<dyn std::error::Error>> {
let (tx, rx) = flume::unbounded();
let state = VersionedState::new_with_user_message("refresh cache");
let _final_state = app.invoke_with_sinks(
state,
vec![
Box::new(StdOutSink::default()),
Box::new(JsonLinesSink::to_stdout()),
Box::new(ChannelSink::new(tx)),
],
).await?;
while let Ok(event) = rx.try_recv() {
eprintln!("mirrored event: {}", event.message());
}
# Ok(()) }
This helper is useful when you want a plain-text operator feed, a structured log sink, and a receiver for application code without building AppRunner by hand.
invoke_streaming
invoke_streaming is the preferred interface for web transports. It creates a fresh EventBus, subscribes an EventStream, spawns the workflow in the background, and returns an InvocationHandle that can be awaited or aborted.
use futures_util::StreamExt as _;
use weavegraph::event_bus::{Event, STREAM_END_SCOPE};
use weavegraph::state::VersionedState;
# async fn sample(app: weavegraph::app::App) -> Result<(), Box<dyn std::error::Error>> {
let seed = VersionedState::new_with_user_message("stream this run");
let (handle, events) = app.invoke_streaming(seed).await;
let reader = tokio::spawn(async move {
let mut feed = events.into_async_stream();
loop {
match feed.next().await {
Some(Event::Diagnostic(marker)) if marker.scope() == STREAM_END_SCOPE => break,
Some(event) => println!("event => {}", event.message()),
None => break,
} // next event branch
} // async stream loop
}); // spawned reader
let final_state = handle.join().await?;
reader.await?;
assert!(final_state.snapshot().messages.len() >= 1);
# Ok(()) }
Two lifecycle rules matter here:
- Dropping the
InvocationHandleaborts the background workflow task. - Dropping only the
EventStreamdoes not stop the run; callabort()orjoin()on the handle when the client disconnects. The sentinel diagnostic with scopeSTREAM_END_SCOPEis the clean end-of-stream marker. The runtime emits it before closing the channel so HTTP code can flush a final frame and then terminate the connection intentionally.
EventStream operations
EventStream wraps a Tokio broadcast receiver and exposes several consumption styles.
| Method | What it does | Good for |
|---|---|---|
recv().await | wait for the next event or a lag/closed error | direct async loops |
try_recv() | poll without blocking | manual event pumps |
into_async_stream() | produce a boxed futures stream and silently skip lagged slots | SSE or WebSocket adapters |
into_blocking_iter() | iterate from synchronous code | CLI bridges or thread-based tools |
next_timeout(duration).await | wait up to a deadline and skip lag notices | periodic polling loops |
with_shutdown(watch_rx) | stop the stream when external shutdown flips to true | server-managed cancellation |
| Lag handling is built into the hub rather than the consumer. When a receiver falls behind, the hub records dropped messages, logs a warning, and keeps the producer side moving. |
Server-Sent Events shape
examples/production_streaming.rs shows the full Axum integration, but the essential flow is short: start the workflow, map each Event into an SSE frame, and stop when STREAM_END_SCOPE arrives.
use axum::response::sse::{Event as AxumSseFrame, Sse};
use futures_util::StreamExt as _;
use weavegraph::event_bus::{Event, STREAM_END_SCOPE};
# async fn sse(app: weavegraph::app::App, initial: weavegraph::state::VersionedState) {
let (handle, events) = app.invoke_streaming(initial).await;
let sse_stream = events.into_async_stream().map(|event| {
let _closing_frame = matches!(&event, Event::Diagnostic(marker) if marker.scope() == STREAM_END_SCOPE);
Ok::<_, std::convert::Infallible>(AxumSseFrame::default().json_data(event).expect("serialize event"))
}); // SSE frame mapper
let join_task = tokio::spawn(async move {
match handle.join().await {
Ok(_) => {}
Err(err) => tracing::error!("workflow join failed: {err}"),
} // join outcome
}); // spawned join task
let _ = join_task;
let _response = Sse::new(sse_stream);
# }
If you need to cancel when the client vanishes, keep the handle in shared state and call abort() from the disconnect path, just like the production example does.
Buffer sizing and diagnostics
The broadcast side is configured through RuntimeConfig and EventBusConfig.
use weavegraph::runtimes::{DiagnosticsConfig, EventBusConfig, RuntimeConfig, SinkConfig};
let runtime = RuntimeConfig::default().with_event_bus(
EventBusConfig::new(2048, vec![SinkConfig::StdOut]).with_diagnostics(DiagnosticsConfig {
enabled: true,
buffer_capacity: Some(512),
emit_to_events: false,
})
);
let _ = runtime;
What these settings change:
buffer_capacitysets the size of the broadcast ring used byEventHub.- diagnostics capacity can match or diverge from the main event buffer.
emit_to_eventscontrols whether sink failures stay isolated inDiagnosticsStreamor are also mirrored into the primary event feed. At runtime you can inspect sink status withEventBus::sink_health()and subscribe to failure notifications withEventBus::diagnostics(). That is the supported way to watch sink health without polluting the normal client stream.
Event payload families
Every helper above ultimately delivers the same event_bus::Event enum.
Event::Nodecarries structured node output emitted throughNodeContext::emit.Event::Diagnosticcarries framework markers, includingSTREAM_END_SCOPEandINVOCATION_END_SCOPE.Event::LLMcarries chunk, final, and error events produced byemit_llm_chunk,emit_llm_final, oremit_llm_error. Because every variant exposesscope_label()andmessage(), many consumers can stay generic and only branch when they need variant-specific metadata.
When to drop down to AppRunner
The convenience methods cover the common one-shot cases. Use AppRunner::builder() when you need any of the following:
- an explicitly supplied
EventBus, - a custom or preconnected
Checkpointer, - iterative sessions driven by
invoke_next, - a runner that survives across multiple logical inputs. That lower-level API is the same engine used underneath the convenience helpers, so moving down a level does not change event semantics; it only exposes more of the wiring.
Where to explore next
- examples/convenience_streaming.rs demonstrates the channel and multi-sink helpers.
- examples/streaming_events.rs shows the generic
EventStreampattern. - examples/production_streaming.rs shows SSE plus checkpoint-backed execution.
- ARCHITECTURE.md explains how the bus, scheduler, reducers, and checkpointers fit together.