Event-Stream
Der Event-Stream ist ein systemweiter Bus für
One-to-Many-Messaging, das nicht in die Eltern-Kind- oder
Sender-Empfänger-Form passt. Jeder Actor kann eine Klasse von Event
abonnieren; jeder Code kann publishen; der Bus matcht und tellt das
Event an jeden Subscriber.
import { Actor, ActorSystem, Props } from 'actor-ts';
class UserLoggedIn { constructor(public readonly userId: string) {}}
class AuditLogger extends Actor<UserLoggedIn> { override preStart(): void { this.context.system.eventStream.subscribe(this.context.self, UserLoggedIn); } override onReceive(event: UserLoggedIn): void { this.log.info(`user ${event.userId} logged in`); }}
class MetricsCollector extends Actor<UserLoggedIn> { override preStart(): void { this.context.system.eventStream.subscribe(this.context.self, UserLoggedIn); } override onReceive(event: UserLoggedIn): void { this.incrementCounter('logins'); }}
const system = ActorSystem.create('demo');system.spawnAnonymous(Props.create(() => new AuditLogger()));system.spawnAnonymous(Props.create(() => new MetricsCollector()));
// Überall — auch außerhalb eines Actors:system.eventStream.publish(new UserLoggedIn('user-42'));// → beide Subscriber empfangen das EventDer Publisher weiß nicht, wie viele Subscriber existieren, und die Subscriber wissen nicht, wer publisht. Das ist der ganze Sinn — lose Kopplung für Querschnitt-Anliegen (Logging, Metriken, Audit, Fan-Out-Alerts).
Die API
Abschnitt betitelt „Die API“class EventStream { subscribe<T>( subscriber: ActorRef, channel: new (...args: any[]) => T, predicate?: (event: T) => boolean, ): boolean;
unsubscribe<T>(subscriber: ActorRef, channel?: new (...args: any[]) => T): boolean;
publish(event: object): void;}Drei Operationen:
subscribe(ref, ClassName)— Interesse an einer Klasse von Events registrieren. Gibttruezurück, wenn ein neues Abo hinzugefügt wurde,false, wenn ein Duplikat ignoriert wurde.unsubscribe(ref, ClassName?)— Abos entfernen. Mit der Klasse: nur diesen Channel; ohne: jedes Abo für diese Ref.publish(event)— Fire-and-Forget. Der Bus läuft seine Abo-Liste durch, matcht viainstanceofundtellt jeden Match.
Matching ist instanceof-basiert, Subklassen-Instanzen erreichen
also Base-Class-Subscriber:
class SystemEvent {}class UserLoggedIn extends SystemEvent {}class UserLoggedOut extends SystemEvent {}
eventStream.subscribe(auditor, SystemEvent); // fängt beideseventStream.subscribe(loginCounter, UserLoggedIn); // fängt nur die In-EventsDas macht hierarchische Event-Taxonomien einfach: Subscriber wählen ihr Level.
Predicate-Filterung
Abschnitt betitelt „Predicate-Filterung“eventStream.subscribe( metricsActor, HttpResponse, (event) => event.status >= 500, // nur 5xx-Responses liefern);Für hochfrequente Channels (Cluster-Events, jede HTTP-Response,
jeder Metric-Tick) ist Filtern auf der Bus-Seite günstiger als
Filtern innerhalb des onReceive jedes Subscribers.
Drei Details:
- Kein Dedup für Predicate-tragende Abos. Ohne Predicate
deduppt der Bus pro
(subscriber, channel)-Paar —subscribeerneut aufzurufen, ist ein No-Op. Mit einem Predicate fügt jeder Aufruf ein neues Abo hinzu, weil Predicate-Funktionen keinen Identitätsvertrag haben. “Diesen Filter ersetzen” heißtunsubscribeund dann erneutsubscribe. - Ein throwendes Predicate wird als “kein Match” für diese Auslieferung behandelt. Andere Subscriber sind unberührt; das Abo bleibt aktiv. Eine Warnung wird über den System-Logger geloggt.
- Predicates laufen zur Publish-Zeit, auf dem Stack des Publishers. Halte sie schnell und rein — ein schwerer Predicate verlangsamt jedes Publish.
Wann den Event-Stream verwenden
Abschnitt betitelt „Wann den Event-Stream verwenden“Drei gute Passungen:
- Querschnitt-Beobachtung — Logging, Metriken, Audit. Viele Subscriber, von denen der Publisher keinen kennen soll.
- Systemweite Benachrichtigungen — “der Cluster hat ein Mitglied gewonnen”, “der Cache wurde geflusht”. Die Cluster-Extension publisht ihre eigenen Events hier; du kannst abonnieren und reagieren.
- Entkoppeln von Actor-Verdrahtung in Tests — statt eine Spy-Ref durch fünf Schichten zu legen, publishe auf dem Stream und lass den Spy direkt abonnieren.
Wann NICHT verwenden
Abschnitt betitelt „Wann NICHT verwenden“Eingebaute Events, die abonniert werden können
Abschnitt betitelt „Eingebaute Events, die abonniert werden können“Das Framework publisht mehrere Event-Typen, die du abonnieren kannst:
| Event | Woher es kommt |
|---|---|
DeadLetter | Jede an /deadLetters geroutete Nachricht wird auch publisht. Nützlich für “Alarme auf verlorene Nachrichten”. |
ClusterEvent (Subklassen) | MemberUp, MemberRemoved, UnreachableMember, etc. Siehe Cluster. |
JournalEvent | Persistierte Events, emittiert von PersistentActors, für Projection-Consumer. |
| Broker-Events | BrokerActor-Subklassen publishen Connect/Disconnect/Lag-Events. |
Siehe die Docs jeder Extension für das volle Event-Vokabular, das sie publisht.
Wie es weitergeht
Abschnitt betitelt „Wie es weitergeht“- DistributedPubSub — cluster-weites Publish/Subscribe mit Topic-Routing; komplementär zum lokalen Event-Stream.
- Actor-System —
system.eventStreamist, wo du den Bus erreichst. - Nachrichten — die Form-Regeln (unveränderlich, keine Method-Refs) gelten genauso für Events.
Die EventStream-API-Referenz deckt
die volle Schnittstelle ab.