MQTT
MqttActor integriert mit MQTT-Brokern (Mosquitto, EMQ X, HiveMQ,
AWS IoT Core). Unterstützt sowohl MQTT 3.1.1 als auch 5; Topic-
Wildcards (+, #); QoS 0/1/2; Retained Messages.
import { ActorSystem, Props, MqttActor } from 'actor-ts';
const mqtt = system.spawn( Props.create(() => new MqttActor({ url: 'mqtts://mqtt.example.com:8883', clientId: 'my-app', username: process.env.MQTT_USER, password: process.env.MQTT_PASS, })), 'mqtt',);
// Ein Topic subscribieren (mit optionalen Wildcards):mqtt.tell({ kind: 'subscribe', topic: 'sensors/+/temp', qos: 1, target: telemetryHandler,});
// Publishen:mqtt.tell({ kind: 'publish', publish: { topic: 'commands/device-42', payload: JSON.stringify({ action: 'reboot' }), qos: 1, retain: false, },});Settings
Abschnitt betitelt „Settings“interface MqttActorSettings extends BrokerCommonSettings { url: string; // mqtt:// oder mqtts:// clientId?: string; username?: string; password?: string; keepalive?: number; // Sekunden; Default 60 clean?: boolean; // Default true protocolVersion?: 3 | 4 | 5; // Default 4 (= MQTT 3.1.1) will?: { topic: string; payload: string | Uint8Array; qos: 0 | 1 | 2; retain: boolean; };}Häufige Muster:
clean: false— die Session bleibt bestehen; der Broker liefert Nachrichten, die während der Trennung verpasst wurden (abhängig von der Broker-Konfiguration).will: { ... }— der Broker publisht das, wenn der Client unsauber die Verbindung verliert. Nützlich für Präsenz (“device-42-offline”).protocolVersion: 5— aktiviert MQTT-5-Features (Response- Topics, Message-Ablauf). Braucht einen v5-fähigen Broker.
QoS-Level
Abschnitt betitelt „QoS-Level“| QoS | Zustellung | Kosten |
|---|---|---|
| 0 | At-most-once. Fire-and-Forget. | Am günstigsten; Nachrichten können verloren gehen. |
| 1 | At-least-once. Broker speichert bis ACK. | Default für die meisten Anwendungsfälle. Duplikate möglich. |
| 2 | Exactly-once. Voller Handshake. | Am langsamsten; selten nötig. |
Für IoT-Telemetrie ist QoS 1 die typische Balance. Befehle an Geräte: ebenfalls QoS 1 (oder QoS 2, falls Duplikate schädlich wären). Statusupdates, die schnell überholt werden (Sensorwerte): QoS 0 ist okay.
Topic-Wildcards
Abschnitt betitelt „Topic-Wildcards“mqtt.tell({ kind: 'subscribe', topic: 'sensors/+/temp', qos: 1, target: handler });// passt zu: 'sensors/dev1/temp', 'sensors/dev2/temp', ...
mqtt.tell({ kind: 'subscribe', topic: 'devices/#', qos: 1, target: handler });// passt zu: 'devices/irgendwas/irgendwo/...'| Wildcard | Trifft |
|---|---|
+ | Genau eine Ebene. |
# | Mehrere Ebenen (muss das letzte Segment sein). |
Retained Messages
Abschnitt betitelt „Retained Messages“mqtt.tell({ kind: 'publish', publish: { topic: 'config/device-42', payload: JSON.stringify({ ... }), retain: true, // ← vom Broker gespeichert },});Eine Retained Message wird vom Broker gespeichert und an jeden neuen Subscriber sofort ausgeliefert. Verwende sie für:
- Konfiguration — Geräte, die beim Verbinden die aktuellste Config lesen.
- Letzter bekannter Status — der zuletzt bekannte Zustand eines Geräts, sichtbar für frische Subscriber.
Setze retain auf false für normale Pub/Sub-Nachrichten — sonst
hält der Broker jede alte Nachricht für immer sichtbar.
Eingehende Nachrichten
Abschnitt betitelt „Eingehende Nachrichten“Subscriber empfangen MqttMessage:
interface MqttMessage { readonly topic: string; readonly payload: Uint8Array; readonly qos: 0 | 1 | 2; readonly retain: boolean;}
class TelemetryHandler extends Actor<MqttMessage> { override onReceive(msg: MqttMessage): void { const data = JSON.parse(new TextDecoder().decode(msg.payload)); this.log.info(`${msg.topic}: ${data.value}`); }}Peer-Dependency
Abschnitt betitelt „Peer-Dependency“npm install mqtt# oder: bun add mqttWann MQTT
Abschnitt betitelt „Wann MQTT“Drei gute Einsatzfälle:
- IoT / Telemetrie — Geräte publishen Sensordaten und subscribieren Befehle.
- Brücken aus bestehender MQTT-Infrastruktur — wenn der Broker schon da ist.
- Leichtgewichtiges Pub/Sub — wenn Kafka Overkill ist (kleine Payloads, keine Replay-Historie nötig).
Für clusterinternes Pub/Sub ist DistributedPubSub einfacher (kein externer Broker).
Wohin als Nächstes
Abschnitt betitelt „Wohin als Nächstes“- I/O-Übersicht — das große Bild.
- BrokerActor-Basis — der gemeinsame Lifecycle.
- Kafka — für durables Streaming.
- NATS — für leichtgewichtiges Pub/Sub.