Skip to content

Event stream

The event stream is a system-wide bus for one-to-many messaging that doesn’t fit the parent-child or sender-receiver shape. Any actor can subscribe to a class of event; any code can publish; the bus matches and tells the event to every subscriber.

import { Actor, ActorSystem, Props } from 'actor-ts';
class UserLoggedIn {
constructor(public readonly userId: string) {}
}
class AuditLogger extends Actor<UserLoggedIn> {
override preStart(): void {
this.context.system.eventStream.subscribe(this.context.self, UserLoggedIn);
}
override onReceive(event: UserLoggedIn): void {
this.log.info(`user ${event.userId} logged in`);
}
}
class MetricsCollector extends Actor<UserLoggedIn> {
override preStart(): void {
this.context.system.eventStream.subscribe(this.context.self, UserLoggedIn);
}
override onReceive(event: UserLoggedIn): void {
this.incrementCounter('logins');
}
}
const system = ActorSystem.create('demo');
system.actorOf(Props.create(() => new AuditLogger()));
system.actorOf(Props.create(() => new MetricsCollector()));
// Anywhere — including outside an actor:
system.eventStream.publish(new UserLoggedIn('user-42'));
// → both subscribers receive the event

The publisher doesn’t know how many subscribers exist, and the subscribers don’t know who’s publishing. That’s the whole point — loose coupling for cross-cutting concerns (logging, metrics, audit, fan-out alerts).

class EventStream {
subscribe<T>(
subscriber: ActorRef,
channel: new (...args: any[]) => T,
predicate?: (event: T) => boolean,
): boolean;
unsubscribe<T>(subscriber: ActorRef, channel?: new (...args: any[]) => T): boolean;
publish(event: object): void;
}

Three operations:

  • subscribe(ref, ClassName) — register interest in a class of events. Returns true if a new subscription was added, false if a duplicate was ignored.
  • unsubscribe(ref, ClassName?) — remove subscriptions. With the class, just that channel; without, every subscription for that ref.
  • publish(event) — fire-and-forget. The bus walks its subscription list, matches via instanceof, and tells each match.

Matching is instanceof-based, so subclass instances reach base-class subscribers:

class SystemEvent {}
class UserLoggedIn extends SystemEvent {}
class UserLoggedOut extends SystemEvent {}
eventStream.subscribe(auditor, SystemEvent); // catches both
eventStream.subscribe(loginCounter, UserLoggedIn); // catches only the in-events

This makes hierarchical event taxonomies easy: subscribers pick their level.

eventStream.subscribe(
metricsActor,
HttpResponse,
(event) => event.status >= 500, // only deliver 5xx responses
);

For high-frequency channels (cluster events, every HTTP response, every metric tick), filtering on the bus side is cheaper than filtering inside each subscriber’s onReceive.

Three details:

  • No dedup for predicate-bearing subscriptions. Without a predicate, the bus dedups per (subscriber, channel) pair — re-calling subscribe is a no-op. With a predicate, every call adds a new subscription, because predicate functions have no identity contract. “Replace this filter” means unsubscribe then subscribe again.
  • A throwing predicate is treated as “no match” for that delivery. Other subscribers are unaffected; the subscription stays active. A warning is logged via the system logger.
  • Predicates run at publish time, on the publisher’s stack. Keep them fast and pure — a heavy predicate slows every publish.

Three good fits:

  1. Cross-cutting observation — logging, metrics, audit. Many subscribers, none of which the publisher should know about.
  2. System-wide notifications — “the cluster gained a member,” “the cache flushed.” The cluster extension publishes its own events here; you can subscribe and react.
  3. De-coupling actor wiring in tests — instead of plumbing a spy ref through five layers, publish on the stream and let the spy subscribe directly.

The framework publishes several event types you can subscribe to:

EventWhere it comes from
DeadLetterEvery message routed to /deadLetters is also published. Useful for “alarms on lost messages.”
ClusterEvent (subclasses)MemberUp, MemberRemoved, UnreachableMember, etc. See Cluster.
JournalEventPersisted events emitted by PersistentActors, for Projection consumers.
Broker eventsBrokerActor subclasses publish connect/disconnect/lag events.

See each extension’s docs for the full event vocabulary it publishes.

  • DistributedPubSub — cluster-wide publish/subscribe with topic routing; complementary to the local event stream.
  • Actor systemsystem.eventStream is where you reach the bus.
  • Messages — the shape rules (immutable, no method refs) apply equally to events.

The EventStream API reference covers the full surface.