README.md

May 9, 2026 · View on GitHub

Reactive V (RxV)

vlang.io | Docs | Operators | Tutorials | Contributing

Continuous Integration Deploy Documentation License: MIT

RxV is a ReactiveX implementation for the V programming language — fully generic, built on channels, zero dependencies.


Table of Contents


What is ReactiveX?

ReactiveX is an API for programming with Observable streams. It provides a unified model for handling asynchronous data — whether from user events, HTTP responses, database results, or any other source.

RxV brings this model to V using:

  • Generic ObservablesObservableImpl[T] works with any type
  • Channel-based pipelines — each operator spawns a thread and connects via chan Item[T]
  • Composable operators — filter, map, merge, reduce, and more

Install

v install ulises-jeremias.rxv

Quick Start

import ulises_jeremias.rxv

fn main() {
	// 1. Create a stream of integers
	mut obs := rxv.range(1, 6) // emits 1, 2, 3, 4, 5

	// 2. Keep only even numbers
	mut evens := obs.filter(fn (v int) bool {
		return v % 2 == 0
	})

	// 3. Double each value using map_ (free function — see V Compiler Notes)
	mut doubled := rxv.map_[int, int](mut evens, fn (v int) ?int {
		return v * 2
	})

	// 4. Subscribe and collect results
	done := doubled.for_each(fn (v int) {
		println(v)
	}, fn (e IError) {
		eprintln('error: ${e}')
	}, fn () {
		println('done')
	})
	_ = <-done
	// Output:
	// 4
	// 8
	// done
}

A More Complete Example

import ulises_jeremias.rxv

fn main() {
	// Aggregate: sum all integers from 1 to 10
	mut obs := rxv.range(1, 10)
	mut total := rxv.reduce_[int, int](mut obs, 0, fn (acc int, val int) int {
		return acc + val
	})

	done := total.for_each(fn (v int) {
		println('Sum 1..10 = ${v}')
	}, fn (e IError) {
		eprintln('error: ${e}')
	}, fn () {})
	_ = <-done
	// Output: Sum 1..10 = 55
}

Supported Operators

Creating

OperatorDescription
just[T](items ...T)Emit fixed values
from_slice[T](items []T)Emit items from a slice
from_channel[T](ch)Wrap an existing channel
create[T](producer)Create from a producer function
empty[T]()Complete immediately
throw[T](err)Emit one error and complete
range(start, count)Emit a range of integers
repeat[T](value, count)Emit the same value N times
interval(period_ms)Emit sequential integers periodically
timer(delay_ms)Emit 0 after a delay
defer_[T](factory)Lazily evaluate on each subscription

Filtering

OperatorDescription
.filter(predicate)Keep only matching items
.take(n)Emit at most N items
.skip(n)Skip the first N items
.take_last(n)Emit only the last N items
.first()Emit only the first item
.last()Emit only the last item
.distinct()Suppress all duplicates
.distinct_until_changed()Suppress consecutive duplicates
.timeout(ms)Error if no item within deadline
.contains(pred)Emit true if any item satisfies predicate
.is_empty()Emit true if source completes without items
.element_at(index)Emit item at index or error if out of bounds

Timing (free functions — see V Compiler Notes)

OperatorDescription
debounce_[T](mut o, delay_ms)Emit after silent window
sample[T](mut o, period_ms)Emit most recent at intervals
throttle_first_[T](mut o, delay_ms)Emit first, block until window resets

Transforming (free functions — see V Compiler Notes)

OperatorDescription
map_[T, U](mut o, fn)Transform each item to type U
flat_map_[T, U](mut o, fn)Map each item to an inner observable, merge all
concat_map_[T, U](mut o, fn)Like flat_map_ but sequential

Aggregating (free functions)

OperatorDescription
scan_[T, U](mut o, seed, fn)Emit each intermediate accumulated value
reduce_[T, U](mut o, seed, fn)Emit only the final accumulated value
count_[T](mut o)Emit the total item count

Combining

OperatorDescription
merge[T](mut o1, mut o2)Interleave emissions from two observables
concat[T](observables)Emit all items from each observable in sequence

Mathematical (f64 only)

OperatorDescription
.average_f64()Compute the arithmetic mean
.sum_f64()Compute the sum

Subscribing

OperatorDescription
.observe()Returns the underlying chan Item[T]
.for_each(next, err, done)Subscribe with callbacks

V Compiler Notes

RxV targets V 0.5.x. The current compiler has limitations with certain generic patterns:

Methods cannot have additional type parameters. (mut o ObservableImpl[T]) map[U](fn(T) U) is not supported.

Workaround: Operators that transform to a different type are exposed as free functions named with a _ suffix:

// Instead of obs.map[string](fn(v int) string { ... })
mut labels := rxv.map_[int, string](mut obs, fn (v int) ?string { return v.str() })

// Instead of obs.scan[int](0, accumulator)
mut running := rxv.scan_[int, int](mut obs, 0, fn (acc int, v int) int { return acc + v })

See the full list of compiler workarounds in docs/API.md.


Examples

ExampleDescription
hello_worldMinimal hello world
02-from-slice-and-rangeCreating observables
03-filteringfilter, take, distinct, chaining
04-transforming-mapmap_, flat_map_, concat_map_
05-aggregationscan_, reduce_, count_, average_f64, sum_f64
06-combiningmerge, concat
07-error-handlingthrow, error propagation

Run any example:

v run examples/05-aggregation/main.v

Tutorials

Step-by-step guides in docs/tutorials/:

  1. Hello World
  2. Creating Observables
  3. Filtering
  4. Transforming
  5. Error Handling

API Reference

docs/README.md — documentation home
docs/API.md — full type and function reference
docs/OPERATORS.md — operator reference with examples


Testing

./bin/test

Or run a single test file directly:

cd ~/.vmodules/ulises_jeremias && v run rxv/filter_observe_test.v

Note: Avoid v test rxv — it runs all tests in parallel which can exhaust system resources. Use ./bin/test which runs them sequentially.


Contributing

See CONTRIBUTING.md for the contribution workflow.