Observable Subscriptions in rclnodejs

December 18, 2025 ยท View on GitHub

rclnodejs provides RxJS Observable support for subscriptions, enabling reactive programming patterns when working with ROS 2 messages.

Table of Contents

What are Observable Subscriptions?

Observable Subscriptions wrap standard ROS 2 subscriptions and expose messages through RxJS Observables. This allows you to use the full power of reactive programming for:

  • ๐Ÿ”„ Rate limiting: throttleTime(), debounceTime(), auditTime()
  • ๐Ÿ”ง Transformation: map(), scan(), buffer()
  • ๐Ÿ” Filtering: filter(), distinctUntilChanged(), take()
  • ๐Ÿ”— Combining topics: combineLatest(), merge(), zip()
  • โš ๏ธ Error handling: catchError(), retry(), timeout()

Observable subscriptions are ideal for complex message processing pipelines where you need to combine, filter, or transform data from multiple sources.

Note: RxJS filter() operates at the application level after messages are received. For simple content-based filtering that reduces network traffic, consider using DDS Content Filtering instead, which filters at the middleware level before messages reach your application. See Example 6 for combined usage.

Basic Usage

const rclnodejs = require('rclnodejs');
const { throttleTime, map, filter } = require('rxjs');

async function main() {
  await rclnodejs.init();
  const node = new rclnodejs.Node('observable_example');

  // Create an Observable subscription
  const obsSub = node.createObservableSubscription(
    'sensor_msgs/msg/LaserScan',
    '/scan'
  );

  // Use RxJS operators to process messages
  obsSub.observable
    .pipe(
      throttleTime(200), // Limit to 5 Hz
      map((msg) => msg.ranges),
      filter((ranges) => ranges.length > 0)
    )
    .subscribe((ranges) => {
      console.log('Received ranges:', ranges.length);
    });

  node.spin();
}

main();

API Reference

node.createObservableSubscription(typeClass, topic, options?, eventCallbacks?)

Creates a subscription that returns an ObservableSubscription.

Parameters:

  • typeClass - Message type (string, object, or class)
  • topic - Topic name
  • options - Optional subscription options (same as createSubscription())
  • eventCallbacks - Optional event callbacks

Returns: ObservableSubscription

ObservableSubscription

Property/MethodTypeDescription
observableObservable<T>RxJS Observable that emits messages
subscriptionSubscriptionUnderlying ROS 2 subscription
topicstringTopic name
isDestroyedbooleanWhether the observable has been completed
complete()voidComplete the observable and stop emitting
destroy()voidAlias for complete()

Examples

Example 1: Throttling High-Frequency Sensors

const { throttleTime } = require('rxjs');

// LiDAR publishes at 20Hz, but we only need 5Hz for visualization
const lidarSub = node.createObservableSubscription(
  'sensor_msgs/msg/LaserScan',
  '/scan'
);

lidarSub.observable
  .pipe(throttleTime(200)) // 200ms = 5Hz
  .subscribe((scan) => {
    visualize(scan);
  });

Example 2: Combining Multiple Topics

const { combineLatest, map } = require('rxjs');

const odomSub = node.createObservableSubscription(
  'nav_msgs/msg/Odometry',
  '/odom'
);
const imuSub = node.createObservableSubscription('sensor_msgs/msg/Imu', '/imu');

combineLatest([odomSub.observable, imuSub.observable])
  .pipe(
    map(([odom, imu]) => ({
      position: odom.pose.pose.position,
      orientation: imu.orientation,
    }))
  )
  .subscribe((combined) => {
    console.log('Combined data:', combined);
  });

Example 3: Debouncing Burst Events

const { debounceTime } = require('rxjs');

// Joystick commands may come in bursts - only act on the final value
const joySub = node.createObservableSubscription('sensor_msgs/msg/Joy', '/joy');

joySub.observable
  .pipe(debounceTime(50)) // Wait 50ms of quiet before processing
  .subscribe((joy) => {
    processJoystickCommand(joy);
  });

Example 4: Sampling Every Nth Message

const { filter } = require('rxjs');

// IMU at 100Hz - sample every 10th message for logging
let count = 0;
const imuSub = node.createObservableSubscription('sensor_msgs/msg/Imu', '/imu');

imuSub.observable.pipe(filter(() => ++count % 10 === 0)).subscribe((imu) => {
  logToFile(imu);
});

Example 5: Buffering Messages

const { bufferTime, filter } = require('rxjs');

const tempSub = node.createObservableSubscription(
  'sensor_msgs/msg/Temperature',
  '/temperature'
);

// Collect messages over 1 second, then process as batch
tempSub.observable
  .pipe(
    bufferTime(1000),
    filter((batch) => batch.length > 0)
  )
  .subscribe((batch) => {
    const avgTemp =
      batch.reduce((sum, msg) => sum + msg.temperature, 0) / batch.length;
    console.log('Average temperature:', avgTemp);
  });

Example 6: Combining DDS Content Filtering with RxJS

For optimal performance, use DDS content filtering to reduce network traffic at the middleware level, then apply RxJS operators for additional processing:

const { throttleTime, map } = require('rxjs');

// DDS filters at middleware level - only temperatures > 30ยฐC are delivered
const tempSub = node.createObservableSubscription(
  'sensor_msgs/msg/Temperature',
  '/temperature',
  {
    contentFilter: {
      expression: 'temperature > %0',
      parameters: ['30.0'],
    },
  }
);

// RxJS processes the pre-filtered stream
tempSub.observable
  .pipe(
    throttleTime(1000), // Rate limit to 1 msg/sec
    map((msg) => ({
      celsius: msg.temperature,
      fahrenheit: msg.temperature * 1.8 + 32,
    }))
  )
  .subscribe((temp) => {
    console.log(`High temp alert: ${temp.celsius}ยฐC (${temp.fahrenheit}ยฐF)`);
  });

This approach provides:

  • Network efficiency: DDS drops messages below 30ยฐC before transmission
  • CPU efficiency: RxJS only processes relevant messages
  • Flexibility: RxJS handles rate limiting and transformation

See Content Filtering Subscription for more details on DDS content filtering.

Cleanup

Always clean up subscriptions when done:

// Option 1: Complete the observable
obsSub.complete();

// Option 2: Destroy via node
node.destroySubscription(obsSub.subscription);

// Option 3: Destroy the entire node
node.destroy();

Best Practices

1. Always Unsubscribe

Prevent memory leaks by unsubscribing when done:

const rxjsSubscription = obsSub.observable
  .pipe(throttleTime(100))
  .subscribe((msg) => console.log(msg));

// Later, when cleanup is needed:
rxjsSubscription.unsubscribe();
obsSub.complete();

2. Use take() for Finite Streams

const { take } = require('rxjs');

// Only process the first 10 messages
obsSub.observable.pipe(take(10)).subscribe({
  next: (msg) => console.log(msg),
  complete: () => console.log('Received 10 messages'),
});

3. Handle Errors Gracefully

const { catchError } = require('rxjs');
const { of } = require('rxjs');

obsSub.observable
  .pipe(
    map((msg) => processMessage(msg)),
    catchError((err) => {
      console.error('Processing error:', err);
      return of(null); // Continue with null on error
    }),
    filter((result) => result !== null)
  )
  .subscribe((result) => {
    console.log('Processed:', result);
  });

4. Combine with Content Filtering

For maximum efficiency, combine RxJS operators with DDS-level content filtering:

const { map } = require('rxjs');

// DDS filters at middleware level (reduces network traffic)
const obsSub = node.createObservableSubscription(
  'sensor_msgs/msg/Temperature',
  '/temperature',
  {
    contentFilter: {
      expression: 'temperature > %0',
      parameters: [50.0],
    },
  }
);

// RxJS operators for additional processing
obsSub.observable
  .pipe(
    map((msg) => ({ temp: msg.temperature, critical: msg.temperature > 80 }))
  )
  .subscribe((data) => {
    if (data.critical) {
      console.warn('Critical temperature:', data.temp);
    }
  });

Running the Examples

The rclnodejs repository includes an Observable subscription example in example/topics/subscriber/.

Run the Built-in Example

# Terminal 1 - Start a publisher
node example/topics/publisher/publisher-example.js

# Terminal 2 - Run the observable subscription example
node example/topics/subscriber/subscription-observable-example.js

Expected Output

Observable subscription created on /topic
Run: ros2 topic pub /topic std_msgs/msg/String "{data: Hello ROS}" -r 5
[Throttled] Hello ROS 2 from rclnodejs
[Filtered] Hello ROS 2 from rclnodejs
[Throttled] Hello ROS 2 from rclnodejs
[Filtered] Hello ROS 2 from rclnodejs
[Batched] 3 messages
[Filtered] Hello ROS 2 from rclnodejs
[Throttled] Hello ROS 2 from rclnodejs
[Filtered] Hello ROS 2 from rclnodejs
[Batched] 3 messages
  • [Throttled] โ€” Rate limited to ~2 messages/second via throttleTime(500)
  • [Filtered] โ€” Only messages containing "ROS" (all pass in this case)
  • [Batched] โ€” Emits after every 3 messages via bufferCount(3)

Custom Example

// observable-example.js
const rclnodejs = require('rclnodejs');
const { take, map } = require('rxjs');

async function main() {
  await rclnodejs.init();
  const node = new rclnodejs.Node('observable_demo');

  const obsSub = node.createObservableSubscription(
    'std_msgs/msg/String',
    '/test_topic'
  );

  obsSub.observable
    .pipe(
      take(5),
      map((msg) => msg.data.toUpperCase())
    )
    .subscribe({
      next: (data) => console.log('Received:', data),
      complete: () => {
        console.log('Done - received 5 messages');
        node.destroy();
        rclnodejs.shutdown();
      },
    });

  node.spin();
}

main().catch(console.error);
# Terminal 2 - Run the example
node observable-example.js

Expected Output

Received: HELLO
Received: HELLO
Received: HELLO
Received: HELLO
Received: HELLO
Done - received 5 messages

Comparison with Callback API

FeaturecreateSubscription()createObservableSubscription()
StyleCallback-basedObservable-based
Rate limitingManual implementationVia RxJS operators
Combining topicsManualBuilt-in with RxJS
Learning curveLowerRequires RxJS knowledge
Use caseSimple subscriptionsComplex reactive pipelines
DependenciesNoneRxJS (included)

Both APIs can coexist in the same application. Use the callback-based API for simple cases and the Observable API when you need advanced reactive patterns.


This tutorial is part of the rclnodejs documentation. For more tutorials and examples, visit the rclnodejs GitHub repository.