Aedes

February 9, 2026 ยท View on GitHub

new Aedes([options])

  • options <object>

    • mq <MQEmitter> middleware used to deliver messages to subscribed clients. In a cluster environment it is used also to share messages between brokers instances. Default: mqemitter

    • concurrency <number> maximum number of concurrent messages that can be processed simultaneously by mq. Default: 100

      Note: Concurrency determines how many messages can be "in-flight" at once. When a slow client blocks (socket buffer full), the message delivery hangs waiting for the drain event. With N concurrency slots and ANY frozen subscriber on a topic, fast clients will receive at most N messages before complete deadlock. Higher concurrency = larger buffer before freeze, but without drainTimeout, deadlock is inevitable. Use drainTimeout to protect against this.

    • persistence <Persistence> middleware that stores QoS > 0, retained, will packets and subscriptions. Default: aedes-persistence (in memory) Versions 1.x and above require persistence to support async access,see MIGRATION.md for details.

    • queueLimit <number> maximum number of queued messages before client session is established. If number of queued items exceeds, connectionError throws an error Client queue limit reached. Default: 42

    • maxClientsIdLength option to override MQTT 3.1.0 clients Id length limit. Default: 23

    • heartbeatInterval <number> an interval in millisconds at which server beats its health signal in $SYS/<aedes.id>/heartbeat topic. Default: 60000

    • id <string> aedes broker unique identifier. Default: uuidv4()

    • connectTimeout <number> maximum waiting time in milliseconds waiting for a CONNECT packet. Default: 30000

    • keepaliveLimit <number> maximum client keep alive time allowed, 0 means no limit. Default: 0

    • drainTimeout <number> maximum time in milliseconds to wait for a slow client's socket to drain before disconnecting it. When a client's socket buffer fills up (e.g., slow network, unresponsive client), the broker waits for the drain event. Without a timeout, one slow client can block message delivery to all other clients. Set to 0 to disable and wait indefinitely (not recommended). Default: 60000 (60 seconds)

      Why use drainTimeout? When publishing messages, if a client's TCP buffer is full, socket.write() returns false and the broker waits for the drain event before continuing. If the client stops reading (slow 3G, crashed app, malicious client), drain never fires and that message hangs forever. Even with high concurrency, a single frozen subscriber will eventually exhaust all slots and cause complete deadlock - no more messages can be delivered to ANY client. This is a DoS vulnerability.

      Recommended settings:

      • Production: 10000 - 60000 ms (10-60 seconds)
      • High-latency networks: Higher values to avoid disconnecting legitimate slow clients
      • IoT/embedded devices: Consider client capabilities when setting timeout
      // Recommended for production
      const broker = await Aedes.createBroker({
        drainTimeout: 30000  // Disconnect unresponsive clients after 30 seconds (default: 60000)
        // drainTimeout: 0   // Disable timeout - NOT RECOMMENDED: vulnerable to DoS
      })
      
      // Monitor disconnections (optional)
      broker.on('clientDisconnect', (client) => {
        console.log(`Client ${client.id} disconnected`)
      })
      
  • Returns <Aedes>

Create a new Aedes server instance.

Aedes is the class exported by this module. The instance will only start listening after aedes.listen() is called. The recommended way to start an Aedes server is to use Aedes.createBroker([options]) instead.

Aedes.createBroker([options])

An async static method in the Aedes class which creates the instance and automatically awaits listen().

Using Aedes.createBroker([options]) is the recommended way to start Aedes, example:

const aedes = await Aedes.createBroker([options])

It uses the same options as new Aedes([options])

aedes.listen()

Async method to make the aedes instance start listening. Example:

const aedes = new Aedes([options])
await aedes.listen()

You should typically not need to use this as it is more compact to use Aedes.createBroker([options]) instead.

aedes.id

  • <string> Default: uuidv4()

Server unique identifier.

aedes.connectedClients

  • <number> Default: 0

Number of connected clients in server.

aedes.closed

  • <boolean> Default: false

a read-only flag indicates if server is closed or not.

Event: client

Emitted when the client registers itself to server. The client is not ready yet. Its connecting state equals to true.

Server publishes a SYS topic $SYS/<aedes.id>/new/clients to inform it registers the client into its registration pool. client.id is the payload.

Event: clientReady

Emitted when the client has received all its offline messages and be initialized. The client connected state equals to true and is ready for processing incoming messages.

Event: clientDisconnect

Emitted when a client disconnects.

Server publishes a SYS topic $SYS/<aedes.id>/disconnect/clients to inform it deregisters the client. client.id is the payload.

Event: clientError

Emitted when an error occurs.

Event: connectionError

Emitted when an error occurs. Unlike clientError it raises only when client is uninitialized.

Event: keepaliveTimeout

Emitted when timeout happes in the client keepalive.

Event: publish

Emitted when servers delivers the packet to subscribed client. If there are no clients subscribed to the packet topic, server still publish the packet and emit the event. client is null when packet is an internal message like aedes heartbeat message and LWT.

Note! packet belongs aedes-packet type. Some properties belongs to aedes internal, any changes on them will break aedes internal flow.

Event: ack

Emitted an QoS 1 or 2 acknowledgement when the packet successfully delivered to the client.

Event: ping

Emitted when client sends a PINGREQ.

Event: subscribe

  • subscriptions <object>
  • client <Client>

Emitted when client successfully subscribe the subscriptions in server.

subscriptions is an array of { topic: topic, qos: qos }. The array excludes duplicated topics and includes negated subscriptions where qos equals to 128. See more on authorizeSubscribe

Server publishes a SYS topic $SYS/<aedes.id>/new/subscribers to inform a client successfully subscribed to one or more topics. The payload is a JSON that has clientId and subs props, subs equals to subscriptions array.

Event: unsubscribe

  • unsubscriptions Array<string>
  • client <Client>

Emitted when client successfully unsubscribe the subscriptions in server.

unsubscriptions are an array of unsubscribed topics.

Server publishes a SYS topic $SYS/<aedes.id>/new/unsubscribers to inform a client successfully unsubscribed to one or more topics. The payload is a JSON that has clientId and subs props, subs equals to unsubscriptions array.

Event: connackSent

Emitted when server sends an acknowledge to client. Please refer to the MQTT specification for the explanation of returnCode object property in CONNACK.

Event: closed

Emitted when server is closed.

aedes.handle (stream)

  • stream: <net.Socket> | <stream.Duplex>
  • Returns: <Client>

A connection listener that pipe stream to aedes.

import { createServer } from 'node:net'
import { Aedes } from 'aedes'

const aedes = await Aedes.createBroker()
const server = createServer(aedes.handle)

aedes.subscribe (topic, deliverfunc, callback)

  • topic: <string>
  • deliverfunc: <Function> (packet, cb) => void
    • packet: <aedes-packet> & PUBLISH
    • cb: <Function>
  • callback: <Function>

Directly subscribe a topic in server side. Bypass authorizeSubscribe

The topic and deliverfunc is a compound key to differentiate the uniqueness of its subscription pool. topic could be the one that is existed, in this case deliverfunc will be invoked as well as SUBSCRIBE does.

deliverfunc supports backpressue.

In aedes internal, deliverfunc is a function that delivers messages to subscribed clients.

Note! packet belongs aedes-packet type. Some properties belongs to aedes internal, any changes on them will break aedes internal flow.

In general you would find most properities in packet is same as what the incoming PUBLISH is. For sure cmd property in packet structure in deliverfunc must be publish.

Note! it requires deliverfunc to call cb before the function returns, otherwise some subscribed clients with same topic will not receive messages.

callback is invoked when server successfully registers the subscription.

aedes.unsubscribe (topic, deliverfunc, callback)

Reverse of aedes.subscribe.

Note! the deliverfunc should be same as when aedes.subscribe does, otherwise the unsubscription will fail.

aedes.publish (packet, callback)

  • packet <object> PUBLISH
  • callback <Function> (error) => void
    • error <Error> | null

Directly deliver packet on behalf of server to subscribed clients. Bypass authorizePublish.

callback will be invoked with error arugments after finish.

aedes.close ([callback])

  • callback: <Function>

Close aedes server and disconnects all clients.

callback will be invoked when server is closed.

Handler: preConnect (client, packet, callback)

  • client: <Client>
  • packet: <object> CONNECT
  • callback: <Function> (error, successful) => void
    • error <Error> | null
    • successful <boolean>

Invoked when server receives a valid CONNECT packet. The packet can be modified.

client object is in default state. If invoked callback with no errors and successful be true, server will continue to establish a session.

Any error will be raised in connectionError event.

Some Use Cases:

  1. Rate Limit / Throttle by client.conn.remoteAddress
  2. Check aedes.connectedClient to limit maximum connections
  3. IP blacklisting
aedes.preConnect = function(client, packet, callback) {
  callback(null, client.conn.remoteAddress === '::1') {
}
aedes.preConnect = function(client, packet, callback) {
  callback(new Error('connection error'), client.conn.remoteAddress !== '::1') {
}

Handler: authenticate (client, username, password, callback)

  • client: <Client>
  • username: <string>
  • password: <Buffer>
  • callback: <Function> (error, successful) => void
    • error <Error> | null
    • successful <boolean>

Invoked after preConnect.

Server parses the CONNECT packet, initializes client object which set client.id to match the one in CONNECT packet and extract username and password as parameters for user-defined authentication flow.

If invoked callback with no errors and successful be true, server authenticates client and continues to setup the client session.

If authenticated, server acknowledges a CONNACK with returnCode=0, otherwise returnCode=5. Users could define the value between 2 and 5 by defining a returnCode property in error object.

aedes.authenticate = function (client, username, password, callback) {
  callback(null, username === 'matteo')
}
aedes.authenticate = function (client, username, password, callback) {
  var error = new Error('Auth error')
  error.returnCode = 4
  callback(error, null)
}

Please refer to Connect Return Code to see their meanings.

Handler: authorizePublish (client, packet, callback)

  • client: <Client> | null
  • packet: <object> PUBLISH
  • callback: <Function> (error) => void
    • error <Error> | null

Invoked when

  1. publish LWT to all online clients
  2. incoming client publish

client is null when aedes publishes obsolete LWT without connected clients

If invoked callback with no errors, server authorizes the packet otherwise emits clientError with error. If an error occurs the client connection will be closed, but no error is returned to the client (MQTT-3.3.5-2)

aedes.authorizePublish = function (client, packet, callback) {
  if (packet.topic === 'aaaa') {
    return callback(new Error('wrong topic'))
  }
  if (packet.topic === 'bbb') {
    packet.payload = Buffer.from('overwrite packet payload')
  }
  callback(null)
}

By default authorizePublish throws error in case a client publish to topics with $SYS/ prefix to prevent possible DoS (see #597). If you write your own implementation of authorizePublish we suggest you to add a check for this. Default implementation:

function defaultAuthorizePublish (client, packet, callback) {
  if (packet.topic.startsWith($SYS_PREFIX)) {
    return callback(new Error($SYS_PREFIX + ' topic is reserved'))
  }
  callback(null)
}

Handler: authorizeSubscribe (client, subscription, callback)

  • client: <Client>
  • subscription: <object>
  • callback: <Function> (error) => void
    • error <Error> | null
    • subscription: <object> | null

Invoked when

  1. restore subscriptions in non-clean session.
  2. incoming client SUBSCRIBE

subscription is a dictionary object like { topic: hello, qos: 0 }.

If invoked callback with no errors, server authorizes the packet otherwise emits clientError with error.

In general user should not touch the subscription and pass to callback, but server gives an option to change the subscription on-the-fly.

aedes.authorizeSubscribe = function (client, sub, callback) {
  if (sub.topic === 'aaaa') {
    return callback(new Error('wrong topic'))
  }
  if (sub.topic === 'bbb') {
    // overwrites subscription
    sub.topic = 'foo'
    sub.qos = 1
  }
  callback(null, sub)
}

To negate a subscription, set the subscription to null. Aedes ignores the negated subscription and the qos in SubAck is set to 128 based on MQTT 3.11 spec:

aedes.authorizeSubscribe = function (client, sub, callback) {
  // prohibited to subscribe 'aaaa' and suppress error
  callback(null, sub.topic === 'aaaa' ? null : sub)
}

Handler: authorizeForward (client, packet)

  • client: <Client>
  • packet: <aedes-packet> & PUBLISH
  • Returns: <aedes-packet> | null

Invoked when

  1. aedes sends retained messages when client reconnects
  2. aedes pre-delivers subscribed message to clients

Return null will not forward packet to clients.

In general user should not touch the packet and return it what it is, but server gives an option to change the packet on-the-fly and forward it to clients.

Note! packet belongs aedes-packet type. Some properties belongs to aedes internal, any changes on them will break aedes internal flow.

aedes.authorizeForward = function (client, packet) {
  if (packet.topic === 'aaaa' && client.id === "I should not see this") {
    return
  }
  if (packet.topic === 'bbb') {
    packet.payload = new Buffer('overwrite packet payload')
  }
  return packet
}

Handler: published (packet, client, callback)

same as Event: publish, but provides a backpressure functionality. TLDR; If you are doing operations on packets that MUST require finishing operations on a packet before handling the next one use this otherwise, expecially for long running operations, you should use Event: publish instead.