PubSubx

March 28, 2026 · View on GitHub

Hex.pm Version License Issues

PubSubx is a lightweight event router for Elixir. It is built for apps that need more than exact pubsub topics but do not want a larger event bus: hierarchical topic patterns, subscriber-side filtering, structured event envelopes, and Telemetry hooks.

Why use it

  • Subscribe with exact topics or wildcard patterns like "orders.*" and "orders.**".
  • Receive %PubSubx.Event{} envelopes with topic, payload, timestamp, metadata, correlation ID, and trace ID.
  • Filter events at the subscriber so a broad subscription can still be selective.
  • Observe subscribe, unsubscribe, publish, delivery, drop, distributed publish, and subscriber-count events through Telemetry.

Installation

If available in Hex, the package can be installed by adding pub_subx to your list of dependencies in mix.exs:

def deps do
  [
    {:pub_subx, "~> 0.2.0"}
  ]
end

API conventions

The public API keeps the same argument order throughout:

  • subscribe(pubsub, topic_pattern, pid, opts \\ [])
  • publish(pubsub, topic, payload, opts \\ [])
  • unsubscribe(pubsub, topic_pattern, pid)

Usage

Add PubSubx to your supervision tree

In your application.ex, start the PubSub server as part of your supervision tree:

def start(_type, _args) do
  children = [
    {MyApp.MyPubSub, []}
  ]

  opts = [strategy: :one_for_one, name: MyApp.Supervisor]
  Supervisor.start_link(children, opts)
end

Basic subscribe and publish

Subscriptions are automatically removed when the subscriber process exits, so explicit unsubscribe/3 calls are only needed when you want to stop receiving events before the process terminates.

:ok = MyApp.MyPubSub.subscribe("orders.created", self())

:ok = MyApp.MyPubSub.publish("orders.created", %{id: 1},
  metadata: %{source: :checkout},
  correlation_id: "corr-123",
  trace_id: "trace-123"
)

receive do
  %PubSubx.Event{} = event ->
    IO.inspect(event.topic)
    IO.inspect(event.payload)
end

Subscribe with an optional filter

The :filter option is optional. Use it only when a subscriber wants to accept just a subset of the events that match a topic pattern.

As with exact subscriptions, the subscriber is automatically deregistered if its process exits.

:ok =
  MyApp.MyPubSub.subscribe("orders.*", self(),
    filter: fn event -> event.payload.region == :eu end
  )

:ok =
  MyApp.MyPubSub.publish("orders.created", %{id: 1, region: :eu},
    metadata: %{source: :checkout},
    correlation_id: "corr-123",
    trace_id: "trace-123"
  )

receive do
  %PubSubx.Event{} = event ->
    IO.inspect(event.topic)
    IO.inspect(event.payload)
end

Distributed publish across interconnected nodes

If your Erlang nodes are already connected (eg: libcluster usage), PubSubx.Utils.distribute_publish/4 can fan a publish out to all nodes or selected nodes. Local delivery and node_opts: [:visible, :this] are enabled by default; pass include_local?: false only when you want remote-only fanout. This is best-effort delivery; it does not synchronize subscriptions or wait for acknowledgements. It also relies on the PubSub GenServer being started in each application supervision tree under the same PubSub module name.

:ok = MyApp.MyPubSub.subscribe("orders.created", self())

summary =
  PubSubx.Utils.distribute_publish(MyApp.MyPubSub, "orders.created", %{id: 1},
    publish: [
      metadata: %{source: :cluster},
      correlation_id: "dist-123"
    ]
  )

IO.inspect(summary.attempted_nodes)

# Remote-only fanout:
# PubSubx.Utils.distribute_publish(MyApp.MyPubSub, "orders.created", %{id: 1},
#   include_local?: false
# )

receive do
  %PubSubx.Event{} = event ->
    IO.inspect(event.topic)
    IO.inspect(event.metadata)
end

Topic matching

  • Exact atom topics remain exact-only.
  • Binary topics can be hierarchical: "orders.created", "orders.eu.created".
  • * matches one segment.
  • ** matches zero or more trailing segments and must be the last segment.

Distributed publish

PubSubx.Utils.distribute_publish/4 provides best-effort cross-node fanout. It can include the local node, forwards publish options, and emits [:pub_subx, :distribute, :publish].

It does not:

  • synchronize subscriptions across nodes
  • wait for acknowledgements
  • retry failed remote deliveries

Telemetry

PubSubx emits the following Telemetry events:

  • [:pub_subx, :subscribe]
  • [:pub_subx, :unsubscribe]
  • [:pub_subx, :publish]
  • [:pub_subx, :delivery]
  • [:pub_subx, :drop]
  • [:pub_subx, :subscriber_count]
  • [:pub_subx, :distribute, :publish]

Benchmarks

Benchmark scaffolding lives in bench/pub_subx_bench.exs and compares:

  • PubSubx exact publish
  • PubSubx wildcard publish
  • Phoenix.PubSub exact publish
  • plain Registry exact dispatch

Run it with:

mix run bench/pub_subx_bench.exs

The script starts the phoenix_pubsub application it needs before running the comparison, so the command above is the intended way to execute the benchmark.

Example run on a MacBook Pro 13-inch (Mid 2017, no Touch Bar, two Thunderbolt 3 ports), Intel Core i5-7360U 2.30 GHz, 4 cores, 8 GB RAM:

Name                                   ips        average
pub_subx exact publish            917.28 K        1.09 μs
pub_subx wildcard publish         834.32 K        1.20 μs
registry exact dispatch           459.20 K        2.18 μs
phoenix_pubsub exact publish      266.79 K        3.75 μs

Relative to this run:

  • pub_subx exact publish was 243.82% faster than phoenix_pubsub exact publish
  • pub_subx exact publish was 99.76% faster than registry exact dispatch
  • pub_subx wildcard publish was 212.73% faster than phoenix_pubsub exact publish
  • pub_subx wildcard publish was 81.69% faster than registry exact dispatch
  • pub_subx wildcard publish was 9.94% slower than pub_subx exact publish

This example is illustrative only. Benchmark results will vary with CPU, Elixir/Erlang versions, scheduler behavior, and system load.

Future direction

If repeated event schemas emerge across multiple users of the library, a later release can add typed event helpers or macros on top of the current event envelope. That is intentionally deferred for now.

The docs can be found at https://hexdocs.pm/pub_subx.

License

This project is licensed under the MIT License. See LICENSE for details.