BrokerActor base class
Every protocol actor in actor-ts/io/broker — KafkaActor,
MqttActor, NatsActor, etc. — extends BrokerActor. The base
class owns the shared lifecycle: connection state machine,
reconnect-with-backoff, outbound buffer, subscriber fan-out,
lifecycle event publishing.
BrokerActor (abstract base)├── lifecycle state machine├── outbound buffer├── reconnect loop├── subscriber tracking└── three abstract methods: ├── connectImpl() ├── disconnectImpl() └── dispatchOutgoing(envelope)Subclasses implement the three protocol hooks; the base handles the rest. This page documents what’s shared. For per-protocol specifics, see the per-protocol pages.
The state machine
Section titled “The state machine”disconnected ──connect──► connecting ──ok──► connected ▲ │ │ │ fail ▼ │ ▼ disconnecting │ reconnect cycle │ └────────────────────────┴──────────────────┘Four states:
disconnected— initial; not currently connected.connecting—connectImplis running.connected— connection up; messages flow.disconnecting—disconnectImplis running.
A disconnected → connecting → failure triggers a reconnect
loop: backoff + retry until success or maxAttempts
exhausted.
Subclass contract
Section titled “Subclass contract”abstract class BrokerActor<S, Cmd, P> extends Actor<Cmd> { // Subclasses implement: protected abstract configKey(): string; protected abstract builtInDefaults(): Partial<S>; protected abstract readSettingsFromConfig(config: Config): Partial<S>; protected abstract requiredSettings(): ReadonlyArray<keyof S>; protected abstract endpointLabel(): string;
protected abstract connectImpl(): Promise<void>; protected abstract disconnectImpl(): Promise<void>; protected abstract dispatchOutgoing(envelope: OutboundEnvelope<P>): Promise<void>;}Three categories:
- Settings glue (
configKey,builtInDefaults,readSettingsFromConfig,requiredSettings,endpointLabel) — describes how to assemble settings from the three layers (constructor + HOCON + defaults) and how to validate. - Protocol hooks (
connectImpl,disconnectImpl,dispatchOutgoing) — the protocol-specific work.
endpointLabel is the human-readable connection identity
(“amqp://localhost:5672”, “kafka-cluster-1”) used in log lines
and lifecycle events.
What the base does for you
Section titled “What the base does for you”Outbound buffering
Section titled “Outbound buffering”this.outbound({ topic: 'orders', payload: ... });Subclasses call this.outbound(envelope) to send. The base
class:
- If connected — calls
dispatchOutgoing(envelope)immediately. - If disconnected — buffers up to
outboundBuffer.capacityenvelopes. On reconnect, drains the buffer.
Overflow per policy: 'drop-oldest' / 'drop-new' / 'reject'
(throw BrokerBufferOverflow).
Subscriber tracking
Section titled “Subscriber tracking”this.onReceive(msg) { if (msg.kind === 'subscribe') { this.subscribe(msg.topic, msg.subscriber); } if (msg.kind === 'unsubscribe') { this.unsubscribe(msg.topic, msg.subscriber); }}subscribe(topic, ref) registers ref as interested in
topic’s inbound messages. The base death-watches the
ref — when it stops, the subscription is removed automatically.
No leaks.
When the protocol pushes an inbound message, the subclass calls:
this.fanOut(topic, inboundMessage);The base delivers to every subscriber for that topic.
Reconnect-with-backoff
Section titled “Reconnect-with-backoff”reconnect: { minBackoffMs: 500, maxBackoffMs: 30_000, randomFactor: 0.2, maxAttempts: -1, // -1 = unlimited}Configurable per actor. The base uses the same exponential-backoff math as BackoffPolicy, with jitter to avoid synchronized retries across clients.
Each attempt fires BrokerReconnectAttempt on the event stream;
after maxAttempts exhausted (if finite), BrokerReconnectFailed
fires and the actor stays disconnected.
Lifecycle events
Section titled “Lifecycle events”Published on system.eventStream:
| Event | When |
|---|---|
BrokerConnected | A connectImpl succeeded. |
BrokerDisconnected | A disconnectImpl ran or a connection failed. |
BrokerReconnectAttempt | A reconnect attempt is starting. |
BrokerReconnectFailed | maxAttempts exhausted. |
BrokerBufferOverflow | The outbound buffer dropped an envelope. |
BrokerNotConnected | Sent without a connection. |
Subscribe to monitor every broker actor uniformly:
system.eventStream.subscribe(monitorRef, BrokerConnected);system.eventStream.subscribe(monitorRef, BrokerDisconnected);The events include actorPath — distinguish events from
different broker actors in the system.
Settings resolution
Section titled “Settings resolution”1. builtInDefaults() ← lowest priority (always applied)2. readSettingsFromConfig() ← HOCON overrides3. Constructor argument ← highest priority (per-instance)preStart merges the three layers, validates against
requiredSettings(), and stashes the result for the rest of the
actor’s life via this.settings.
Missing required settings cause an early-error throw on
preStart — the actor goes through the supervisor’s failure
path before it ever attempts to connect.
Writing a custom protocol actor
Section titled “Writing a custom protocol actor”import { BrokerActor, type OutboundEnvelope, type BrokerCommonSettings } from 'actor-ts';
interface MyProtocolSettings extends BrokerCommonSettings { readonly url: string;}
class MyProtocolActor extends BrokerActor<MyProtocolSettings, Cmd, MyPayload> { private conn: MyClient | null = null;
protected configKey() { return 'actor-ts.io.broker.my-protocol'; } protected builtInDefaults() { return { /* ... */ }; } protected readSettingsFromConfig(c) { /* parse HOCON */ return {}; } protected requiredSettings() { return ['url'] as const; } protected endpointLabel() { return this.settings.url; }
protected async connectImpl(): Promise<void> { this.conn = await MyClient.connect(this.settings.url); this.conn.onMessage((m) => this.fanOut(m.topic, m)); }
protected async disconnectImpl(): Promise<void> { await this.conn?.close(); this.conn = null; }
protected async dispatchOutgoing(env: OutboundEnvelope<MyPayload>): Promise<void> { await this.conn!.send(env.payload); }}The base handles the rest. Most third-party clients (kafkajs,
nats.js, etc.) have an event-based message-receive API that
maps cleanly to this.fanOut(...).
Where to next
Section titled “Where to next”- I/O overview — the bigger picture: which protocols ship.
- Kafka / MQTT / NATS / etc. — per-protocol pages.
- Event stream — where lifecycle events are published.
- Backoff policy — the math behind the reconnect loop.