Zum Inhalt springen
Deutsch

DistributedPubSub

DistributedPubSub ist die clusterweite Version des lokalen Event-Streams — Pub/Sub per Topic-Namen, knotenübergreifend.

per Gossip bekannte Subs

auf anderen Nodes

Publisher

Mediator von node-A

lokaler Sub auf node-A

Mediator von node-B

lokaler Sub auf node-B

Jeder Node hostet einen Mediator an einem bekannten Pfad (/user/pubsub-mediator). Subscriber registrieren sich bei ihrem lokalen Mediator; Mediators verteilen die Topic→Node-Karte per Gossip. Beim Veröffentlichen geht die Nachricht an den lokalen Mediator, der an jeden Node ausfächert, der Subscriber für dieses Topic hat.

import { ActorSystem, Cluster, Props, Actor } from 'actor-ts';
import { DistributedPubSub, type DistributedPubSubMediator, Publish, Subscribe } from 'actor-ts/cluster/pubsub';
class ChatMessage {
constructor(public readonly user: string, public readonly text: string) {}
}
class ChatRoom extends Actor<ChatMessage> {
override onReceive(msg: ChatMessage): void {
this.log.info(`[chat] ${msg.user}: ${msg.text}`);
}
}
const system = ActorSystem.create('my-app');
const cluster = await Cluster.join(system, { host, port, seeds });
const ps = DistributedPubSub.start(system, { cluster });
// Subscribe (typischerweise im preStart eines Actors):
const room = system.spawnAnonymous(Props.create(() => new ChatRoom()));
ps.mediator.tell(new Subscribe('chat.room.general', room));
// Publish (von überall — von jedem Node, in oder außerhalb eines Actors):
ps.mediator.tell(new Publish('chat.room.general', new ChatMessage('alice', 'hi')));

Das Publish erreicht jeden Subscriber auf jedem Node — die Alice-Nachricht kommt bei room an, unabhängig davon, welcher Node den Publisher hostet.

NachrichtWas
Subscribe(topic, ref)Registriert ref als Subscriber auf topic. Antwortet mit SubscribeAck.
Unsubscribe(topic, ref)Entfernt ref aus den Subscribern von topic.
UnsubscribeAll(ref)Entfernt ref aus jedem Topic.
Publish(topic, message)Schickt message an jeden Subscriber von topic.

Schicke diese an ps.mediator (ein ActorRef). Nutze ask, wenn du das Ack brauchst:

import {} from 'actor-ts';
await ps.mediator.ask(new Subscribe('chat.room.general', room));

Topic-Namen sind beliebige Strings. Das Framework legt keine Struktur fest — chat.room.general, user-42.events, metrics-tier-1 funktionieren alle.

Zur Organisation funktioniert eine punkt-segmentierte Konvention gut (<domain>.<scope>.<resource>), aber das Framework interpretiert die Segmente nicht — es ist nur String-Matching.

Bei Publish(topic, msg):

  1. Der lokale Mediator schlägt das Topic in seiner Map<topic, { local, remoteNodes }> nach.
  2. Lokale Subscriber empfangen direkt — local.values(), jeder bekommt ein tell.
  3. Remote-Nodes mit Subscribern bekommen einen Envelope pro Node (nicht pro Subscriber) — der Mediator auf dem Zielnode fächert an seine Lokalen aus.

Das ergibt eine höchstens-ein-Remote-Hop-Zustellung: ein Publish kettelt nie über mehrere Nodes, um einen Subscriber zu erreichen.

Der Mediator hält seine Map<topic, SubscriberSet> lokal, aber gossipt Deltas zu Peers:

  • “Node X hat jetzt Subscriber für Topic Y.”
  • “Node X hat keine Subscriber mehr für Topic Y.”

Standard-Gossip-Intervall ist gossipIntervalMs des Clusters (1 Sekunde). Überschreibe pro Mediator:

DistributedPubSub.start(system, { cluster, gossipIntervalMs: 500 });

Niedrigere Intervalle → schnellere Konvergenz nach Subscribe / Unsubscribe, mehr Geplapper. 500 ms ist für Chat-artige Anwendungen vernünftig.

Der Mediator unsubscribet gestoppte Refs nicht automatisch. Wenn ein Subscriber-Actor stoppt, ohne Unsubscribe zu senden, versucht der Mediator weiter tell auf ihn — Nachrichten landen in Dead Letters, das System loggt Warnungen.

Best Practice: im postStop des Subscribers Unsubscribe oder UnsubscribeAll senden:

class Subscriber extends Actor<...> {
override preStart(): void {
this.system.extension(...).mediator.tell(new Subscribe('topic', this.self));
}
override postStop(): void {
this.system.extension(...).mediator.tell(new UnsubscribeAll(this.self));
}
}

Das Cleanup ist nicht strikt notwendig — Dead-Letter-Routing ist für die Publisher still — aber es vermeidet Log-Rauschen und ungenutzten State im Mediator.

Drei gute Anwendungsfälle:

  1. Chat / Notifications — mehrere Subscriber (oft auf verschiedenen Nodes), die sich für dasselbe Topic interessieren.
  2. Systemweite Ankündigungen — ein “schema-updated”-Event, auf das jeder Node reagieren soll.
  3. Entkoppelter Fan-out über Nodes — wenn der Publisher nicht wissen sollte, wie viele Subscriber existieren oder wo sie leben.

Zwei Pub/Sub-Bus-Implementierungen; wähle nach Scope:

BusScopeTopic-Key
Event-StreamEin ActorSystemKlasse (instanceof)
DistributedPubSubClusterweitString-Topic

Nutze den Event-Stream für In-System-Dispatch; nutze DistributedPubSub, wenn Topics Node-Grenzen überschreiten. Beide können koexistieren — viele Apps nutzen beide für verschiedene Belange.

Die DistributedPubSubMediator API-Referenz deckt das vollständige Protokoll ab.