Skip to content

I/O overview

Actors handle in-process work cleanly; the outside world is messier — sockets connect and disconnect, brokers go down, HTTP clients pile up, payloads cross protocol boundaries.

The framework’s I/O modules wrap each protocol in an actor that hides the protocol behind a uniform contract: connect on preStart, publish events to subscribers when messages arrive, buffer outgoing messages while disconnected, reconnect with backoff, surface lifecycle events on the event stream.

┌─────────────────────────────────┐
│ BrokerActor (base class) │
│ │
│ • Lifecycle state machine │
│ disconnected → connecting → │
│ connected → disconnecting │
│ │
│ • Outbound buffer (bounded) │
│ • Reconnect with backoff │
│ • Subscriber fan-out │
│ • Lifecycle events │
│ on system.eventStream │
└─────────────────────────────────┘
┌───────────────┼───────────────┐
│ │ │
KafkaActor MqttActor NatsActor (et al.)
│ │ │
Kafka MQTT NATS
wire wire wire

Subclasses implement three hooks (connectImpl, disconnectImpl, dispatchOutgoing); the base handles the rest.

ModuleProtocolCommon use
KafkaActorApache KafkaHigh-throughput durable log; consumer-group fan-out.
MqttActorMQTT 3.1.1 / 5IoT, telemetry; QoS 0/1/2 + retained messages.
AmqpActorRabbitMQ / AMQP 0.9.1Topic / queue routing, work queues.
NatsActorNATS coreLightweight pub/sub; request-reply.
JetStreamActorNATS JetStreamDurable streaming on NATS.
RedisStreamsActorRedis XADD/XREADLightweight streaming + consumer groups.
GrpcClientActor / GrpcServerActorgRPCTyped RPC + streaming.
WebSocketActor (client)WebSocketOutgoing connection.
ServerWebSocketActorWebSocketIncoming connections (server side).
SseActorServer-Sent EventsOne-way streaming from server.
TcpSocketActorRaw TCPCustom protocols.
UdpSocketActorRaw UDPTelemetry, multicast, low-latency.

Each has its own page in this section. This overview covers the common pattern they all share.

disconnected ──▶ connecting ──▶ connected
▲ │
│ ▼
└────── disconnecting ◀────────┘

Linear transitions during normal operation:

  • disconnected — initial state; not currently connected.
  • connectingconnectImpl is running.
  • connected — connection is up; messages flow.
  • disconnectingdisconnectImpl is running.

Failure during connected triggers a transition to disconnected

  • a reconnect loop. Stop transitions through disconnecting from any state.

Subscribe to the lifecycle events via the event stream:

import { BrokerConnected, BrokerDisconnected, BrokerReconnectAttempt } from 'actor-ts';
class Monitor extends Actor<BrokerConnected | BrokerDisconnected | BrokerReconnectAttempt> {
override preStart(): void {
this.system.eventStream.subscribe(this.self, BrokerConnected);
this.system.eventStream.subscribe(this.self, BrokerDisconnected);
this.system.eventStream.subscribe(this.self, BrokerReconnectAttempt);
}
override onReceive(event: BrokerConnected | BrokerDisconnected | BrokerReconnectAttempt): void {
this.log.info(`broker event: ${event.constructor.name}`);
}
}

The event payload includes which broker actor emitted it (by actorPath), so a single monitor can watch all brokers in the system.

When a connected broker disconnects unexpectedly, the base class schedules a reconnect with exponential backoff. Configurable via BrokerCommonSettings:

{
reconnect: {
minBackoffMs: 500,
maxBackoffMs: 30_000,
randomFactor: 0.2,
maxAttempts: -1, // -1 = unlimited
}
}

Each attempt fires a BrokerReconnectAttempt event. When maxAttempts is exceeded (if finite), the actor surfaces BrokerReconnectFailed and stays disconnected; messages buffered in the outbound queue are eventually dropped per the buffer policy.

While disconnected, outgoing messages don’t fail immediately — they’re enqueued in a bounded outbound buffer. When the connection restores, the buffer is drained.

Configurable size + overflow policy:

{
outboundBuffer: {
capacity: 1_000,
overflow: 'drop-oldest', // or 'drop-new' or 'reject'
}
}

When the buffer overflows, the base publishes a BrokerBufferOverflow event so you can wire it into metrics.

For inbound messages (e.g., a Kafka message landing on a subscribed topic), the broker actor publishes to subscribers — actor refs registered as interested:

const kafka = system.actorOf(
Props.create(() => new KafkaActor({
bootstrapServers: 'localhost:9092',
consumer: { groupId: 'my-app', topics: ['orders'] },
})),
);
const handler = system.actorOf(Props.create(() => new OrderHandler()));
kafka.tell({ kind: 'subscribe', subscriber: handler });
// Now every inbound order is told to `handler`.

Different broker actors have different subscription semantics (a single topic for MQTT, a consumer-group + topic list for Kafka), but the pattern is consistent: register a subscriber ref, receive the protocol’s inbound messages as actor messages.

Three layers, highest-first:

  1. Constructor argument — what you passed when spawning the actor.
  2. HOCON config under a protocol-specific key (e.g. actor-ts.io.kafka { bootstrap-servers = "..." }).
  3. Built-in defaults — sensible production defaults the actor ships with.

This means you can set system-wide defaults in application.conf and override per-instance via the constructor. Per-broker subpages spell out which keys each protocol reads.

Three good fits:

  1. Bridging an external broker into the actor world — Kafka, MQTT, NATS messages need to become actor messages and vice versa. Don’t build the protocol yourself; use the provided actors.
  2. Long-lived connections to a service that pushes data — gRPC streams, WebSocket clients, SSE consumers. The broker actor manages reconnect and buffering.
  3. Mutually-exclusive resource ownership — exactly one TCP connection to a legacy service. The broker actor model serializes access naturally.

Implement the three abstract methods and wire up settings:

import { BrokerActor, type BrokerCommonSettings } from 'actor-ts';
class MyProtocolActor extends BrokerActor<MyOutbound> {
protected configKey() { return 'actor-ts.io.my-protocol'; }
protected builtInDefaults(): BrokerCommonSettings { return { /* defaults */ }; }
protected readSettingsFromConfig(c) { /* parse HOCON */ }
protected requiredSettings() { return ['url']; }
protected async connectImpl(): Promise<void> {
// Establish the connection. Throw on failure.
}
protected async disconnectImpl(): Promise<void> {
// Clean up the connection.
}
protected async dispatchOutgoing(env: OutboundEnvelope<MyOutbound>): Promise<void> {
// Send the payload over the connection.
}
}

The base class handles state transitions, buffering, reconnect, event publishing. Subclasses only own the protocol-specific parts.

The BrokerActor API reference covers the full base-class surface.