Aller au contenu
Français

MQTT

Ce contenu n’est pas encore disponible dans votre langue.

MqttActor integrates with MQTT brokers (Mosquitto, EMQ X, HiveMQ, AWS IoT Core). Supports both MQTT 3.1.1 and 5; topic wildcards (+, #); QoS 0/1/2; retained messages.

import { ActorSystem, Props, MqttActor } from 'actor-ts';
const mqtt = system.spawn(
Props.create(() => new MqttActor({
url: 'mqtts://mqtt.example.com:8883',
clientId: 'my-app',
username: process.env.MQTT_USER,
password: process.env.MQTT_PASS,
})),
'mqtt',
);
// Subscribe to a topic (with optional wildcards):
mqtt.tell({
kind: 'subscribe',
topic: 'sensors/+/temp',
qos: 1,
target: telemetryHandler,
});
// Publish:
mqtt.tell({
kind: 'publish',
publish: {
topic: 'commands/device-42',
payload: JSON.stringify({ action: 'reboot' }),
qos: 1,
retain: false,
},
});
interface MqttActorSettings extends BrokerCommonSettings {
url: string; // mqtt:// or mqtts://
clientId?: string;
username?: string;
password?: string;
keepalive?: number; // seconds; default 60
clean?: boolean; // default true
protocolVersion?: 3 | 4 | 5; // default 4 (= MQTT 3.1.1)
will?: {
topic: string;
payload: string | Uint8Array;
qos: 0 | 1 | 2;
retain: boolean;
};
}

Common patterns:

  • clean: false — session persists; broker delivers messages missed during disconnection (subject to broker config).
  • will: { ... } — broker publishes this when the client disconnects ungracefully. Useful for presence (“device-42-offline”).
  • protocolVersion: 5 — enables MQTT 5 features (response topics, message expiry). Requires a v5-capable broker.
QoSDeliveryCost
0At-most-once. Fire-and-forget.Cheapest; messages may be lost.
1At-least-once. Broker stores until acked.Default for most use cases. Duplicates possible.
2Exactly-once. Full handshake.Slowest; rarely needed.

For IoT telemetry, QoS 1 is the typical balance. Commands to devices: also QoS 1 (or QoS 2 if duplicates would be harmful). Status updates that are quickly superseded (sensor readings): QoS 0 is fine.

mqtt.tell({ kind: 'subscribe', topic: 'sensors/+/temp', qos: 1, target: handler });
// matches: 'sensors/dev1/temp', 'sensors/dev2/temp', ...
mqtt.tell({ kind: 'subscribe', topic: 'devices/#', qos: 1, target: handler });
// matches: 'devices/anything/anywhere/...'
WildcardMatches
+Exactly one level.
#Multiple levels (must be the last segment).
mqtt.tell({
kind: 'publish',
publish: {
topic: 'config/device-42',
payload: JSON.stringify({ ... }),
retain: true, // ← stored by the broker
},
});

A retained message is stored by the broker and delivered immediately to any new subscriber. Use for:

  • Configuration — devices reading the latest config on connect.
  • Last-known-state — a device’s most-recent status visible to fresh subscribers.

Set retain false for normal pub-sub messages — otherwise the broker keeps every old message visible forever.

Subscribers receive MqttMessage:

interface MqttMessage {
readonly topic: string;
readonly payload: Uint8Array;
readonly qos: 0 | 1 | 2;
readonly retain: boolean;
}
class TelemetryHandler extends Actor<MqttMessage> {
override onReceive(msg: MqttMessage): void {
const data = JSON.parse(new TextDecoder().decode(msg.payload));
this.log.info(`${msg.topic}: ${data.value}`);
}
}
Terminal window
npm install mqtt
# or: bun add mqtt

Three good fits:

  1. IoT / telemetry — devices publishing sensor data, subscribing to commands.
  2. Bridging from existing MQTT infrastructure — when the broker is already there.
  3. Lightweight pub/sub — when Kafka is overkill (small payloads, no replay history needed).

For cluster-internal pub/sub, DistributedPubSub is simpler (no external broker).