Py Async Bus
February 26, 2021 ยท View on GitHub
A simple async event bus in Python
Installing
Install with pip:
$ pip install py-async-bus
Usage
from async_bus import EventBus
bus = EventBus()
@bus.subscribe('event_name')
async def subscriber(param):
print(param)
bus.emit('event_name', param='test_param')
Disable subscriber
If you want to disable subscribers while testing for example, you can set the env ASYNC_BUS_ENABLE_SUBSCRIBERS to false.
$ ASYNC_BUS_ENABLE_SUBSCRIBERS=false python -m unittest
Tests
$ python -m unittest