Sensor Streams

May 9, 2026 ยท View on GitHub

Dimos uses reactive streams (RxPY) to handle sensor data. This approach naturally fits robotics where multiple sensors emit data asynchronously at different rates, and downstream processors may be slower than the data sources.

Guides

GuideDescription
ReactiveX FundamentalsObservables, subscriptions, and disposables
Advanced StreamsBackpressure, parallel subscribers, synchronous getters
Quality-Based FilteringSelect highest quality frames when downsampling streams
Temporal AlignmentMatch messages from multiple sensors by timestamp
Storage & ReplayRecord sensor streams to disk and replay with original timing

Quick Example

from reactivex import operators as ops
from dimos.utils.reactive import backpressure
from dimos.types.timestamped import align_timestamped
from dimos.msgs.sensor_msgs.Image import sharpness_barrier

# Camera at 30fps, lidar at 10Hz
camera_stream = camera.observable()
lidar_stream = lidar.observable()

# Pipeline: filter blurry frames -> align with lidar -> handle slow consumers
processed = (
    camera_stream.pipe(
        sharpness_barrier(10.0),  # Keep sharpest frame per 100ms window (10Hz)
    )
)

aligned = align_timestamped(
    backpressure(processed),     # Camera as primary
    lidar_stream,                # Lidar as secondary
    match_tolerance=0.1,
)

aligned.subscribe(lambda pair: process_frame_with_pointcloud(*pair))