Zum Inhalt springen
Deutsch

MQTT

MqttActor integriert mit MQTT-Brokern (Mosquitto, EMQ X, HiveMQ, AWS IoT Core). Unterstützt sowohl MQTT 3.1.1 als auch 5; Topic- Wildcards (+, #); QoS 0/1/2; Retained Messages.

import { ActorSystem, Props, MqttActor } from 'actor-ts';
const mqtt = system.spawn(
Props.create(() => new MqttActor({
url: 'mqtts://mqtt.example.com:8883',
clientId: 'my-app',
username: process.env.MQTT_USER,
password: process.env.MQTT_PASS,
})),
'mqtt',
);
// Ein Topic subscribieren (mit optionalen Wildcards):
mqtt.tell({
kind: 'subscribe',
topic: 'sensors/+/temp',
qos: 1,
target: telemetryHandler,
});
// Publishen:
mqtt.tell({
kind: 'publish',
publish: {
topic: 'commands/device-42',
payload: JSON.stringify({ action: 'reboot' }),
qos: 1,
retain: false,
},
});
interface MqttActorSettings extends BrokerCommonSettings {
url: string; // mqtt:// oder mqtts://
clientId?: string;
username?: string;
password?: string;
keepalive?: number; // Sekunden; Default 60
clean?: boolean; // Default true
protocolVersion?: 3 | 4 | 5; // Default 4 (= MQTT 3.1.1)
will?: {
topic: string;
payload: string | Uint8Array;
qos: 0 | 1 | 2;
retain: boolean;
};
}

Häufige Muster:

  • clean: false — die Session bleibt bestehen; der Broker liefert Nachrichten, die während der Trennung verpasst wurden (abhängig von der Broker-Konfiguration).
  • will: { ... } — der Broker publisht das, wenn der Client unsauber die Verbindung verliert. Nützlich für Präsenz (“device-42-offline”).
  • protocolVersion: 5 — aktiviert MQTT-5-Features (Response- Topics, Message-Ablauf). Braucht einen v5-fähigen Broker.
QoSZustellungKosten
0At-most-once. Fire-and-Forget.Am günstigsten; Nachrichten können verloren gehen.
1At-least-once. Broker speichert bis ACK.Default für die meisten Anwendungsfälle. Duplikate möglich.
2Exactly-once. Voller Handshake.Am langsamsten; selten nötig.

Für IoT-Telemetrie ist QoS 1 die typische Balance. Befehle an Geräte: ebenfalls QoS 1 (oder QoS 2, falls Duplikate schädlich wären). Statusupdates, die schnell überholt werden (Sensorwerte): QoS 0 ist okay.

mqtt.tell({ kind: 'subscribe', topic: 'sensors/+/temp', qos: 1, target: handler });
// passt zu: 'sensors/dev1/temp', 'sensors/dev2/temp', ...
mqtt.tell({ kind: 'subscribe', topic: 'devices/#', qos: 1, target: handler });
// passt zu: 'devices/irgendwas/irgendwo/...'
WildcardTrifft
+Genau eine Ebene.
#Mehrere Ebenen (muss das letzte Segment sein).
mqtt.tell({
kind: 'publish',
publish: {
topic: 'config/device-42',
payload: JSON.stringify({ ... }),
retain: true, // ← vom Broker gespeichert
},
});

Eine Retained Message wird vom Broker gespeichert und an jeden neuen Subscriber sofort ausgeliefert. Verwende sie für:

  • Konfiguration — Geräte, die beim Verbinden die aktuellste Config lesen.
  • Letzter bekannter Status — der zuletzt bekannte Zustand eines Geräts, sichtbar für frische Subscriber.

Setze retain auf false für normale Pub/Sub-Nachrichten — sonst hält der Broker jede alte Nachricht für immer sichtbar.

Subscriber empfangen MqttMessage:

interface MqttMessage {
readonly topic: string;
readonly payload: Uint8Array;
readonly qos: 0 | 1 | 2;
readonly retain: boolean;
}
class TelemetryHandler extends Actor<MqttMessage> {
override onReceive(msg: MqttMessage): void {
const data = JSON.parse(new TextDecoder().decode(msg.payload));
this.log.info(`${msg.topic}: ${data.value}`);
}
}
Terminal-Fenster
npm install mqtt
# oder: bun add mqtt

Drei gute Einsatzfälle:

  1. IoT / Telemetrie — Geräte publishen Sensordaten und subscribieren Befehle.
  2. Brücken aus bestehender MQTT-Infrastruktur — wenn der Broker schon da ist.
  3. Leichtgewichtiges Pub/Sub — wenn Kafka Overkill ist (kleine Payloads, keine Replay-Historie nötig).

Für clusterinternes Pub/Sub ist DistributedPubSub einfacher (kein externer Broker).