Skip to content

BrokerActor base class

Every protocol actor in actor-ts/io/brokerKafkaActor, MqttActor, NatsActor, etc. — extends BrokerActor. The base class owns the shared lifecycle: connection state machine, reconnect-with-backoff, outbound buffer, subscriber fan-out, lifecycle event publishing.

BrokerActor (abstract base)
├── lifecycle state machine
├── outbound buffer
├── reconnect loop
├── subscriber tracking
└── three abstract methods:
├── connectImpl()
├── disconnectImpl()
└── dispatchOutgoing(envelope)

Subclasses implement the three protocol hooks; the base handles the rest. This page documents what’s shared. For per-protocol specifics, see the per-protocol pages.

disconnected ──connect──► connecting ──ok──► connected
▲ │ │
│ fail ▼
│ ▼ disconnecting
│ reconnect cycle │
└────────────────────────┴──────────────────┘

Four states:

  • disconnected — initial; not currently connected.
  • connectingconnectImpl is running.
  • connected — connection up; messages flow.
  • disconnectingdisconnectImpl is running.

A disconnectedconnecting → failure triggers a reconnect loop: backoff + retry until success or maxAttempts exhausted.

abstract class BrokerActor<S, Cmd, P> extends Actor<Cmd> {
// Subclasses implement:
protected abstract configKey(): string;
protected abstract builtInDefaults(): Partial<S>;
protected abstract readSettingsFromConfig(config: Config): Partial<S>;
protected abstract requiredSettings(): ReadonlyArray<keyof S>;
protected abstract endpointLabel(): string;
protected abstract connectImpl(): Promise<void>;
protected abstract disconnectImpl(): Promise<void>;
protected abstract dispatchOutgoing(envelope: OutboundEnvelope<P>): Promise<void>;
}

Three categories:

  • Settings glue (configKey, builtInDefaults, readSettingsFromConfig, requiredSettings, endpointLabel) — describes how to assemble settings from the three layers (constructor + HOCON + defaults) and how to validate.
  • Protocol hooks (connectImpl, disconnectImpl, dispatchOutgoing) — the protocol-specific work.

endpointLabel is the human-readable connection identity (“amqp://localhost:5672”, “kafka-cluster-1”) used in log lines and lifecycle events.

this.outbound({ topic: 'orders', payload: ... });

Subclasses call this.outbound(envelope) to send. The base class:

  • If connected — calls dispatchOutgoing(envelope) immediately.
  • If disconnected — buffers up to outboundBuffer.capacity envelopes. On reconnect, drains the buffer.

Overflow per policy: 'drop-oldest' / 'drop-new' / 'reject' (throw BrokerBufferOverflow).

this.onReceive(msg) {
if (msg.kind === 'subscribe') {
this.subscribe(msg.topic, msg.subscriber);
}
if (msg.kind === 'unsubscribe') {
this.unsubscribe(msg.topic, msg.subscriber);
}
}

subscribe(topic, ref) registers ref as interested in topic’s inbound messages. The base death-watches the ref — when it stops, the subscription is removed automatically. No leaks.

When the protocol pushes an inbound message, the subclass calls:

this.fanOut(topic, inboundMessage);

The base delivers to every subscriber for that topic.

reconnect: {
minBackoffMs: 500,
maxBackoffMs: 30_000,
randomFactor: 0.2,
maxAttempts: -1, // -1 = unlimited
}

Configurable per actor. The base uses the same exponential-backoff math as BackoffPolicy, with jitter to avoid synchronized retries across clients.

Each attempt fires BrokerReconnectAttempt on the event stream; after maxAttempts exhausted (if finite), BrokerReconnectFailed fires and the actor stays disconnected.

Published on system.eventStream:

EventWhen
BrokerConnectedA connectImpl succeeded.
BrokerDisconnectedA disconnectImpl ran or a connection failed.
BrokerReconnectAttemptA reconnect attempt is starting.
BrokerReconnectFailedmaxAttempts exhausted.
BrokerBufferOverflowThe outbound buffer dropped an envelope.
BrokerNotConnectedSent without a connection.

Subscribe to monitor every broker actor uniformly:

system.eventStream.subscribe(monitorRef, BrokerConnected);
system.eventStream.subscribe(monitorRef, BrokerDisconnected);

The events include actorPath — distinguish events from different broker actors in the system.

1. builtInDefaults() ← lowest priority (always applied)
2. readSettingsFromConfig() ← HOCON overrides
3. Constructor argument ← highest priority (per-instance)

preStart merges the three layers, validates against requiredSettings(), and stashes the result for the rest of the actor’s life via this.settings.

Missing required settings cause an early-error throw on preStart — the actor goes through the supervisor’s failure path before it ever attempts to connect.

import { BrokerActor, type OutboundEnvelope, type BrokerCommonSettings } from 'actor-ts';
interface MyProtocolSettings extends BrokerCommonSettings {
readonly url: string;
}
class MyProtocolActor extends BrokerActor<MyProtocolSettings, Cmd, MyPayload> {
private conn: MyClient | null = null;
protected configKey() { return 'actor-ts.io.broker.my-protocol'; }
protected builtInDefaults() { return { /* ... */ }; }
protected readSettingsFromConfig(c) { /* parse HOCON */ return {}; }
protected requiredSettings() { return ['url'] as const; }
protected endpointLabel() { return this.settings.url; }
protected async connectImpl(): Promise<void> {
this.conn = await MyClient.connect(this.settings.url);
this.conn.onMessage((m) => this.fanOut(m.topic, m));
}
protected async disconnectImpl(): Promise<void> {
await this.conn?.close();
this.conn = null;
}
protected async dispatchOutgoing(env: OutboundEnvelope<MyPayload>): Promise<void> {
await this.conn!.send(env.payload);
}
}

The base handles the rest. Most third-party clients (kafkajs, nats.js, etc.) have an event-based message-receive API that maps cleanly to this.fanOut(...).