Depends on aiomqtt

June 6, 2024 ยท View on GitHub

pip install aiomqtt

Based on work done in https://github.com/encode/broadcaster/pull/117

import asyncio import typing from urllib.parse import urlparse

import aiomqtt

from .._base import Event from .base import BroadcastBackend

class MqttBackend(BroadcastBackend): def init(self, url: str): parsed_url = urlparse(url) self._host = parsed_url.hostname or "localhost" self._port = 8883 if parsed_url.scheme == "mqtts" else 1883 self._port = parsed_url.port or self._port self._client = aiomqtt.Client(self._host, port=self._port) self._queue: asyncio.Queue[aiomqtt.Message] = asyncio.Queue() self._listener_task = asyncio.create_task(self._listener())

async def connect(self) -> None:
    await self._client.__aenter__()

async def disconnect(self) -> None:
    self._listener_task.cancel()
    try:
        await self._listener_task
    except asyncio.CancelledError:
        pass

    await self._client.__aexit__(None, None, None)

async def subscribe(self, channel: str) -> None:
    await self._client.subscribe(channel)

async def unsubscribe(self, channel: str) -> None:
    await self._client.unsubscribe(channel)

async def publish(self, channel: str, message: typing.Any) -> None:
    await self._client.publish(channel, message, retain=False)

async def next_published(self) -> Event:
    message = await self._queue.get()

    # Event.message is string, not bytes
    # this is a limiting factor and we need to make sure
    # that the payload is bytes in order to properly decode it
    assert isinstance(message.payload, bytes), "Payload must be bytes."

    return Event(channel=message.topic.value, message=message.payload.decode())

async def _listener(self) -> None:
    async for message in self._client.messages:
        await self._queue.put(message)