MQTT
MqttActor integrates with MQTT brokers (Mosquitto, EMQ X, HiveMQ,
AWS IoT Core). Supports both MQTT 3.1.1 and 5; topic wildcards
(+, #); QoS 0/1/2; retained messages.
import { ActorSystem, Props, MqttActor } from 'actor-ts';
const mqtt = system.actorOf( Props.create(() => new MqttActor({ url: 'mqtts://mqtt.example.com:8883', clientId: 'my-app', username: process.env.MQTT_USER, password: process.env.MQTT_PASS, })), 'mqtt',);
// Subscribe to a topic (with optional wildcards):mqtt.tell({ kind: 'subscribe-mqtt', topic: 'sensors/+/temp', qos: 1, subscriber: telemetryHandler,});
// Publish:mqtt.tell({ kind: 'publish', topic: 'commands/device-42', payload: JSON.stringify({ action: 'reboot' }), qos: 1, retain: false,});Settings
Section titled “Settings”interface MqttActorSettings extends BrokerCommonSettings { url: string; // mqtt:// or mqtts:// clientId?: string; username?: string; password?: string; keepalive?: number; // seconds; default 60 clean?: boolean; // default true protocolVersion?: 3 | 4 | 5; // default 4 (= MQTT 3.1.1) will?: { topic: string; payload: string | Uint8Array; qos: 0 | 1 | 2; retain: boolean; };}Common patterns:
clean: false— session persists; broker delivers messages missed during disconnection (subject to broker config).will: { ... }— broker publishes this when the client disconnects ungracefully. Useful for presence (“device-42-offline”).protocolVersion: 5— enables MQTT 5 features (response topics, message expiry). Requires a v5-capable broker.
QoS levels
Section titled “QoS levels”| QoS | Delivery | Cost |
|---|---|---|
| 0 | At-most-once. Fire-and-forget. | Cheapest; messages may be lost. |
| 1 | At-least-once. Broker stores until acked. | Default for most use cases. Duplicates possible. |
| 2 | Exactly-once. Full handshake. | Slowest; rarely needed. |
For IoT telemetry, QoS 1 is the typical balance. Commands to devices: also QoS 1 (or QoS 2 if duplicates would be harmful). Status updates that are quickly superseded (sensor readings): QoS 0 is fine.
Topic wildcards
Section titled “Topic wildcards”mqtt.tell({ kind: 'subscribe-mqtt', topic: 'sensors/+/temp', qos: 1, subscriber: handler });// matches: 'sensors/dev1/temp', 'sensors/dev2/temp', ...
mqtt.tell({ kind: 'subscribe-mqtt', topic: 'devices/#', qos: 1, subscriber: handler });// matches: 'devices/anything/anywhere/...'| Wildcard | Matches |
|---|---|
+ | Exactly one level. |
# | Multiple levels (must be the last segment). |
Retained messages
Section titled “Retained messages”mqtt.tell({ kind: 'publish', topic: 'config/device-42', payload: JSON.stringify({ ... }), retain: true, // ← stored by the broker});A retained message is stored by the broker and delivered immediately to any new subscriber. Use for:
- Configuration — devices reading the latest config on connect.
- Last-known-state — a device’s most-recent status visible to fresh subscribers.
Set retain false for normal pub-sub messages — otherwise the
broker keeps every old message visible forever.
Inbound messages
Section titled “Inbound messages”Subscribers receive MqttMessage:
interface MqttMessage { readonly topic: string; readonly payload: Uint8Array; readonly qos: 0 | 1 | 2; readonly retain: boolean;}
class TelemetryHandler extends Actor<MqttMessage> { override onReceive(msg: MqttMessage): void { const data = JSON.parse(new TextDecoder().decode(msg.payload)); this.log.info(`${msg.topic}: ${data.value}`); }}Peer dependency
Section titled “Peer dependency”npm install mqtt# or: bun add mqttWhen to use MQTT
Section titled “When to use MQTT”Three good fits:
- IoT / telemetry — devices publishing sensor data, subscribing to commands.
- Bridging from existing MQTT infrastructure — when the broker is already there.
- Lightweight pub/sub — when Kafka is overkill (small payloads, no replay history needed).
For cluster-internal pub/sub, DistributedPubSub is simpler (no external broker).
Where to next
Section titled “Where to next”- I/O overview — the bigger picture.
- BrokerActor base — the shared lifecycle.
- Kafka — for durable streaming.
- NATS — for lightweight pub/sub.