Zum Inhalt springen
Deutsch

Event-Stream

Der Event-Stream ist ein systemweiter Bus für One-to-Many-Messaging, das nicht in die Eltern-Kind- oder Sender-Empfänger-Form passt. Jeder Actor kann eine Klasse von Event abonnieren; jeder Code kann publishen; der Bus matcht und tellt das Event an jeden Subscriber.

import { Actor, ActorSystem, Props } from 'actor-ts';
class UserLoggedIn {
constructor(public readonly userId: string) {}
}
class AuditLogger extends Actor<UserLoggedIn> {
override preStart(): void {
this.context.system.eventStream.subscribe(this.context.self, UserLoggedIn);
}
override onReceive(event: UserLoggedIn): void {
this.log.info(`user ${event.userId} logged in`);
}
}
class MetricsCollector extends Actor<UserLoggedIn> {
override preStart(): void {
this.context.system.eventStream.subscribe(this.context.self, UserLoggedIn);
}
override onReceive(event: UserLoggedIn): void {
this.incrementCounter('logins');
}
}
const system = ActorSystem.create('demo');
system.spawnAnonymous(Props.create(() => new AuditLogger()));
system.spawnAnonymous(Props.create(() => new MetricsCollector()));
// Überall — auch außerhalb eines Actors:
system.eventStream.publish(new UserLoggedIn('user-42'));
// → beide Subscriber empfangen das Event

Der Publisher weiß nicht, wie viele Subscriber existieren, und die Subscriber wissen nicht, wer publisht. Das ist der ganze Sinn — lose Kopplung für Querschnitt-Anliegen (Logging, Metriken, Audit, Fan-Out-Alerts).

class EventStream {
subscribe<T>(
subscriber: ActorRef,
channel: new (...args: any[]) => T,
predicate?: (event: T) => boolean,
): boolean;
unsubscribe<T>(subscriber: ActorRef, channel?: new (...args: any[]) => T): boolean;
publish(event: object): void;
}

Drei Operationen:

  • subscribe(ref, ClassName) — Interesse an einer Klasse von Events registrieren. Gibt true zurück, wenn ein neues Abo hinzugefügt wurde, false, wenn ein Duplikat ignoriert wurde.
  • unsubscribe(ref, ClassName?) — Abos entfernen. Mit der Klasse: nur diesen Channel; ohne: jedes Abo für diese Ref.
  • publish(event) — Fire-and-Forget. Der Bus läuft seine Abo-Liste durch, matcht via instanceof und tellt jeden Match.

Matching ist instanceof-basiert, Subklassen-Instanzen erreichen also Base-Class-Subscriber:

class SystemEvent {}
class UserLoggedIn extends SystemEvent {}
class UserLoggedOut extends SystemEvent {}
eventStream.subscribe(auditor, SystemEvent); // fängt beides
eventStream.subscribe(loginCounter, UserLoggedIn); // fängt nur die In-Events

Das macht hierarchische Event-Taxonomien einfach: Subscriber wählen ihr Level.

eventStream.subscribe(
metricsActor,
HttpResponse,
(event) => event.status >= 500, // nur 5xx-Responses liefern
);

Für hochfrequente Channels (Cluster-Events, jede HTTP-Response, jeder Metric-Tick) ist Filtern auf der Bus-Seite günstiger als Filtern innerhalb des onReceive jedes Subscribers.

Drei Details:

  • Kein Dedup für Predicate-tragende Abos. Ohne Predicate deduppt der Bus pro (subscriber, channel)-Paar — subscribe erneut aufzurufen, ist ein No-Op. Mit einem Predicate fügt jeder Aufruf ein neues Abo hinzu, weil Predicate-Funktionen keinen Identitätsvertrag haben. “Diesen Filter ersetzen” heißt unsubscribe und dann erneut subscribe.
  • Ein throwendes Predicate wird als “kein Match” für diese Auslieferung behandelt. Andere Subscriber sind unberührt; das Abo bleibt aktiv. Eine Warnung wird über den System-Logger geloggt.
  • Predicates laufen zur Publish-Zeit, auf dem Stack des Publishers. Halte sie schnell und rein — ein schwerer Predicate verlangsamt jedes Publish.

Drei gute Passungen:

  1. Querschnitt-Beobachtung — Logging, Metriken, Audit. Viele Subscriber, von denen der Publisher keinen kennen soll.
  2. Systemweite Benachrichtigungen — “der Cluster hat ein Mitglied gewonnen”, “der Cache wurde geflusht”. Die Cluster-Extension publisht ihre eigenen Events hier; du kannst abonnieren und reagieren.
  3. Entkoppeln von Actor-Verdrahtung in Tests — statt eine Spy-Ref durch fünf Schichten zu legen, publishe auf dem Stream und lass den Spy direkt abonnieren.

Das Framework publisht mehrere Event-Typen, die du abonnieren kannst:

EventWoher es kommt
DeadLetterJede an /deadLetters geroutete Nachricht wird auch publisht. Nützlich für “Alarme auf verlorene Nachrichten”.
ClusterEvent (Subklassen)MemberUp, MemberRemoved, UnreachableMember, etc. Siehe Cluster.
JournalEventPersistierte Events, emittiert von PersistentActors, für Projection-Consumer.
Broker-EventsBrokerActor-Subklassen publishen Connect/Disconnect/Lag-Events.

Siehe die Docs jeder Extension für das volle Event-Vokabular, das sie publisht.

  • DistributedPubSub — cluster-weites Publish/Subscribe mit Topic-Routing; komplementär zum lokalen Event-Stream.
  • Actor-Systemsystem.eventStream ist, wo du den Bus erreichst.
  • Nachrichten — die Form-Regeln (unveränderlich, keine Method-Refs) gelten genauso für Events.

Die EventStream-API-Referenz deckt die volle Schnittstelle ab.