Skip to content

MQTT

MqttActor integrates with MQTT brokers (Mosquitto, EMQ X, HiveMQ, AWS IoT Core). Supports both MQTT 3.1.1 and 5; topic wildcards (+, #); QoS 0/1/2; retained messages.

import { ActorSystem, Props, MqttActor } from 'actor-ts';
const mqtt = system.actorOf(
Props.create(() => new MqttActor({
url: 'mqtts://mqtt.example.com:8883',
clientId: 'my-app',
username: process.env.MQTT_USER,
password: process.env.MQTT_PASS,
})),
'mqtt',
);
// Subscribe to a topic (with optional wildcards):
mqtt.tell({
kind: 'subscribe-mqtt',
topic: 'sensors/+/temp',
qos: 1,
subscriber: telemetryHandler,
});
// Publish:
mqtt.tell({
kind: 'publish',
topic: 'commands/device-42',
payload: JSON.stringify({ action: 'reboot' }),
qos: 1,
retain: false,
});
interface MqttActorSettings extends BrokerCommonSettings {
url: string; // mqtt:// or mqtts://
clientId?: string;
username?: string;
password?: string;
keepalive?: number; // seconds; default 60
clean?: boolean; // default true
protocolVersion?: 3 | 4 | 5; // default 4 (= MQTT 3.1.1)
will?: {
topic: string;
payload: string | Uint8Array;
qos: 0 | 1 | 2;
retain: boolean;
};
}

Common patterns:

  • clean: false — session persists; broker delivers messages missed during disconnection (subject to broker config).
  • will: { ... } — broker publishes this when the client disconnects ungracefully. Useful for presence (“device-42-offline”).
  • protocolVersion: 5 — enables MQTT 5 features (response topics, message expiry). Requires a v5-capable broker.
QoSDeliveryCost
0At-most-once. Fire-and-forget.Cheapest; messages may be lost.
1At-least-once. Broker stores until acked.Default for most use cases. Duplicates possible.
2Exactly-once. Full handshake.Slowest; rarely needed.

For IoT telemetry, QoS 1 is the typical balance. Commands to devices: also QoS 1 (or QoS 2 if duplicates would be harmful). Status updates that are quickly superseded (sensor readings): QoS 0 is fine.

mqtt.tell({ kind: 'subscribe-mqtt', topic: 'sensors/+/temp', qos: 1, subscriber: handler });
// matches: 'sensors/dev1/temp', 'sensors/dev2/temp', ...
mqtt.tell({ kind: 'subscribe-mqtt', topic: 'devices/#', qos: 1, subscriber: handler });
// matches: 'devices/anything/anywhere/...'
WildcardMatches
+Exactly one level.
#Multiple levels (must be the last segment).
mqtt.tell({
kind: 'publish',
topic: 'config/device-42',
payload: JSON.stringify({ ... }),
retain: true, // ← stored by the broker
});

A retained message is stored by the broker and delivered immediately to any new subscriber. Use for:

  • Configuration — devices reading the latest config on connect.
  • Last-known-state — a device’s most-recent status visible to fresh subscribers.

Set retain false for normal pub-sub messages — otherwise the broker keeps every old message visible forever.

Subscribers receive MqttMessage:

interface MqttMessage {
readonly topic: string;
readonly payload: Uint8Array;
readonly qos: 0 | 1 | 2;
readonly retain: boolean;
}
class TelemetryHandler extends Actor<MqttMessage> {
override onReceive(msg: MqttMessage): void {
const data = JSON.parse(new TextDecoder().decode(msg.payload));
this.log.info(`${msg.topic}: ${data.value}`);
}
}
Terminal window
npm install mqtt
# or: bun add mqtt

Three good fits:

  1. IoT / telemetry — devices publishing sensor data, subscribing to commands.
  2. Bridging from existing MQTT infrastructure — when the broker is already there.
  3. Lightweight pub/sub — when Kafka is overkill (small payloads, no replay history needed).

For cluster-internal pub/sub, DistributedPubSub is simpler (no external broker).