DistributedPubSub
DistributedPubSub ist die clusterweite Version des lokalen
Event-Streams — Pub/Sub per Topic-Namen, knotenübergreifend.
Jeder Node hostet einen Mediator an einem bekannten Pfad
(/user/pubsub-mediator). Subscriber registrieren sich bei ihrem
lokalen Mediator; Mediators verteilen die Topic→Node-Karte per
Gossip. Beim Veröffentlichen geht die Nachricht an den lokalen
Mediator, der an jeden Node ausfächert, der Subscriber für dieses
Topic hat.
Ein minimales Beispiel
Abschnitt betitelt „Ein minimales Beispiel“import { ActorSystem, Cluster, Props, Actor } from 'actor-ts';import { DistributedPubSub, type DistributedPubSubMediator, Publish, Subscribe } from 'actor-ts/cluster/pubsub';
class ChatMessage { constructor(public readonly user: string, public readonly text: string) {}}
class ChatRoom extends Actor<ChatMessage> { override onReceive(msg: ChatMessage): void { this.log.info(`[chat] ${msg.user}: ${msg.text}`); }}
const system = ActorSystem.create('my-app');const cluster = await Cluster.join(system, { host, port, seeds });const ps = DistributedPubSub.start(system, { cluster });
// Subscribe (typischerweise im preStart eines Actors):const room = system.spawnAnonymous(Props.create(() => new ChatRoom()));ps.mediator.tell(new Subscribe('chat.room.general', room));
// Publish (von überall — von jedem Node, in oder außerhalb eines Actors):ps.mediator.tell(new Publish('chat.room.general', new ChatMessage('alice', 'hi')));Das Publish erreicht jeden Subscriber auf jedem Node — die
Alice-Nachricht kommt bei room an, unabhängig davon, welcher
Node den Publisher hostet.
Die vier Operationen
Abschnitt betitelt „Die vier Operationen“| Nachricht | Was |
|---|---|
Subscribe(topic, ref) | Registriert ref als Subscriber auf topic. Antwortet mit SubscribeAck. |
Unsubscribe(topic, ref) | Entfernt ref aus den Subscribern von topic. |
UnsubscribeAll(ref) | Entfernt ref aus jedem Topic. |
Publish(topic, message) | Schickt message an jeden Subscriber von topic. |
Schicke diese an ps.mediator (ein ActorRef). Nutze ask, wenn
du das Ack brauchst:
import {} from 'actor-ts';
await ps.mediator.ask(new Subscribe('chat.room.general', room));Topic-Namen sind beliebige Strings. Das Framework legt keine
Struktur fest — chat.room.general, user-42.events,
metrics-tier-1 funktionieren alle.
Zur Organisation funktioniert eine punkt-segmentierte Konvention
gut (<domain>.<scope>.<resource>), aber das Framework
interpretiert die Segmente nicht — es ist nur String-Matching.
Wie der Fan-out funktioniert
Abschnitt betitelt „Wie der Fan-out funktioniert“Bei Publish(topic, msg):
- Der lokale Mediator schlägt das Topic in seiner
Map<topic, { local, remoteNodes }>nach. - Lokale Subscriber empfangen direkt —
local.values(), jeder bekommt eintell. - Remote-Nodes mit Subscribern bekommen einen Envelope pro Node (nicht pro Subscriber) — der Mediator auf dem Zielnode fächert an seine Lokalen aus.
Das ergibt eine höchstens-ein-Remote-Hop-Zustellung: ein Publish kettelt nie über mehrere Nodes, um einen Subscriber zu erreichen.
Topic→Node-Gossip
Abschnitt betitelt „Topic→Node-Gossip“Der Mediator hält seine Map<topic, SubscriberSet> lokal, aber
gossipt Deltas zu Peers:
- “Node X hat jetzt Subscriber für Topic Y.”
- “Node X hat keine Subscriber mehr für Topic Y.”
Standard-Gossip-Intervall ist gossipIntervalMs des Clusters
(1 Sekunde). Überschreibe pro Mediator:
DistributedPubSub.start(system, { cluster, gossipIntervalMs: 500 });Niedrigere Intervalle → schnellere Konvergenz nach Subscribe / Unsubscribe, mehr Geplapper. 500 ms ist für Chat-artige Anwendungen vernünftig.
Wenn Subscriber stoppen
Abschnitt betitelt „Wenn Subscriber stoppen“Der Mediator unsubscribet gestoppte Refs nicht automatisch.
Wenn ein Subscriber-Actor stoppt, ohne Unsubscribe zu senden,
versucht der Mediator weiter tell auf ihn — Nachrichten landen
in Dead Letters, das System loggt Warnungen.
Best Practice: im postStop des Subscribers Unsubscribe oder
UnsubscribeAll senden:
class Subscriber extends Actor<...> { override preStart(): void { this.system.extension(...).mediator.tell(new Subscribe('topic', this.self)); } override postStop(): void { this.system.extension(...).mediator.tell(new UnsubscribeAll(this.self)); }}Das Cleanup ist nicht strikt notwendig — Dead-Letter-Routing ist für die Publisher still — aber es vermeidet Log-Rauschen und ungenutzten State im Mediator.
Wann DistributedPubSub einsetzen
Abschnitt betitelt „Wann DistributedPubSub einsetzen“Drei gute Anwendungsfälle:
- Chat / Notifications — mehrere Subscriber (oft auf verschiedenen Nodes), die sich für dasselbe Topic interessieren.
- Systemweite Ankündigungen — ein “schema-updated”-Event, auf das jeder Node reagieren soll.
- Entkoppelter Fan-out über Nodes — wenn der Publisher nicht wissen sollte, wie viele Subscriber existieren oder wo sie leben.
Wann NICHT
Abschnitt betitelt „Wann NICHT“DistributedPubSub vs. Event-Stream
Abschnitt betitelt „DistributedPubSub vs. Event-Stream“Zwei Pub/Sub-Bus-Implementierungen; wähle nach Scope:
| Bus | Scope | Topic-Key |
|---|---|---|
| Event-Stream | Ein ActorSystem | Klasse (instanceof) |
DistributedPubSub | Clusterweit | String-Topic |
Nutze den Event-Stream für In-System-Dispatch; nutze DistributedPubSub, wenn Topics Node-Grenzen überschreiten. Beide können koexistieren — viele Apps nutzen beide für verschiedene Belange.
Wohin als Nächstes
Abschnitt betitelt „Wohin als Nächstes“- Cluster-Überblick — die Mitgliedschaft darunter.
- Event-Stream — das Single-System-Pub/Sub zum Vergleich.
- Refs über Nodes hinweg — wie die cross-node Zustellungen des Mediators serialisieren.
Die DistributedPubSubMediator
API-Referenz deckt das vollständige Protokoll ab.