I/O overview
Ce contenu n’est pas encore disponible dans votre langue.
Actors handle in-process work cleanly; the outside world is messier — sockets connect and disconnect, brokers go down, HTTP clients pile up, payloads cross protocol boundaries.
The framework’s I/O modules wrap each protocol in an actor that
hides the protocol behind a uniform contract: connect on
preStart, publish events to subscribers when messages arrive,
buffer outgoing messages while disconnected, reconnect with
backoff, surface lifecycle events on the event stream.
Subclasses implement three hooks (connectImpl, disconnectImpl,
dispatchOutgoing); the base handles the rest.
The available brokers
Section titled “The available brokers”| Module | Protocol | Common use |
|---|---|---|
KafkaActor | Apache Kafka | High-throughput durable log; consumer-group fan-out. |
MqttActor | MQTT 3.1.1 / 5 | IoT, telemetry; QoS 0/1/2 + retained messages. |
AmqpActor | RabbitMQ / AMQP 0.9.1 | Topic / queue routing, work queues. |
NatsActor | NATS core | Lightweight pub/sub; request-reply. |
JetStreamActor | NATS JetStream | Durable streaming on NATS. |
RedisStreamsActor | Redis XADD/XREAD | Lightweight streaming + consumer groups. |
GrpcClientActor / GrpcServerActor | gRPC | Typed RPC + streaming. |
WebSocketActor (client) | WebSocket | Outgoing connection. |
ServerWebSocketActor | WebSocket | Incoming connections (server side). |
SseActor | Server-Sent Events | One-way streaming from server. |
TcpSocketActor | Raw TCP | Custom protocols. |
UdpSocketActor | Raw UDP | Telemetry, multicast, low-latency. |
Each has its own page in this section. This overview covers the common pattern they all share.
The lifecycle state machine
Section titled “The lifecycle state machine”Linear transitions during normal operation:
disconnected— initial state; not currently connected.connecting—connectImplis running.connected— connection is up; messages flow.disconnecting—disconnectImplis running.
Failure during connected triggers a transition to disconnected
- a reconnect loop. Stop transitions through
disconnectingfrom any state.
Subscribe to the lifecycle events via the event stream:
import { BrokerConnected, BrokerDisconnected, BrokerReconnectAttempt } from 'actor-ts';
class Monitor extends Actor<BrokerConnected | BrokerDisconnected | BrokerReconnectAttempt> { override preStart(): void { this.system.eventStream.subscribe(this.self, BrokerConnected); this.system.eventStream.subscribe(this.self, BrokerDisconnected); this.system.eventStream.subscribe(this.self, BrokerReconnectAttempt); } override onReceive(event: BrokerConnected | BrokerDisconnected | BrokerReconnectAttempt): void { this.log.info(`broker event: ${event.constructor.name}`); }}The event payload includes which broker actor emitted it (by
actorPath), so a single monitor can watch all brokers in the
system.
Reconnect with backoff
Section titled “Reconnect with backoff”When a connected broker disconnects unexpectedly, the base class
schedules a reconnect with exponential backoff. Configurable via
BrokerCommonSettings:
{ reconnect: { minBackoffMs: 500, maxBackoffMs: 30_000, randomFactor: 0.2, maxAttempts: -1, // -1 = unlimited }}Each attempt fires a BrokerReconnectAttempt event. When
maxAttempts is exceeded (if finite), the actor surfaces
BrokerReconnectFailed and stays disconnected; messages buffered
in the outbound queue are eventually dropped per the buffer policy.
Outbound buffer
Section titled “Outbound buffer”While disconnected, outgoing messages don’t fail immediately — they’re enqueued in a bounded outbound buffer. When the connection restores, the buffer is drained.
Configurable size + overflow policy:
{ outboundBuffer: { capacity: 1_000, overflow: 'drop-oldest', // or 'drop-new' or 'reject' }}When the buffer overflows, the base publishes a BrokerBufferOverflow
event so you can wire it into metrics.
Subscriber fan-out
Section titled “Subscriber fan-out”For inbound messages (e.g., a Kafka message landing on a subscribed topic), the broker actor publishes to subscribers — actor refs registered as interested:
const kafka = system.spawn( Props.create(() => new KafkaActor({ bootstrapServers: 'localhost:9092', consumer: { groupId: 'my-app', topics: ['orders'] }, })),);
const handler = system.spawnAnonymous(Props.create(() => new OrderHandler()));
kafka.tell({ kind: 'subscribe', subscriber: handler });// Now every inbound order is told to `handler`.Different broker actors have different subscription semantics (a single topic for MQTT, a consumer-group + topic list for Kafka), but the pattern is consistent: register a subscriber ref, receive the protocol’s inbound messages as actor messages.
Settings precedence
Section titled “Settings precedence”Three layers, highest-first:
- Constructor argument — what you passed when spawning the actor.
- HOCON config under a protocol-specific key (e.g.
actor-ts.io.kafka { bootstrap-servers = "..." }). - Built-in defaults — sensible production defaults the actor ships with.
This means you can set system-wide defaults in application.conf
and override per-instance via the constructor. Per-broker subpages
spell out which keys each protocol reads.
When to use a broker actor
Section titled “When to use a broker actor”Three good fits:
- Bridging an external broker into the actor world — Kafka, MQTT, NATS messages need to become actor messages and vice versa. Don’t build the protocol yourself; use the provided actors.
- Long-lived connections to a service that pushes data — gRPC streams, WebSocket clients, SSE consumers. The broker actor manages reconnect and buffering.
- Mutually-exclusive resource ownership — exactly one TCP connection to a legacy service. The broker actor model serializes access naturally.
What’s NOT a broker actor
Section titled “What’s NOT a broker actor”Writing a custom broker actor
Section titled “Writing a custom broker actor”Implement the three abstract methods and wire up settings:
import { BrokerActor, type BrokerCommonSettings } from 'actor-ts';
class MyProtocolActor extends BrokerActor<MyOutbound> { protected configKey() { return 'actor-ts.io.my-protocol'; } protected builtInDefaults(): BrokerCommonSettings { return { /* defaults */ }; } protected readSettingsFromConfig(c) { /* parse HOCON */ } protected requiredSettings() { return ['url']; }
protected async connectImpl(): Promise<void> { // Establish the connection. Throw on failure. } protected async disconnectImpl(): Promise<void> { // Clean up the connection. } protected async dispatchOutgoing(env: OutboundEnvelope<MyOutbound>): Promise<void> { // Send the payload over the connection. }}The base class handles state transitions, buffering, reconnect, event publishing. Subclasses only own the protocol-specific parts.
Where to next
Section titled “Where to next”- Per-protocol pages — Kafka, MQTT, AMQP, NATS, Redis Streams, gRPC, WebSocket client, WebSocket server, SSE, TCP, UDP.
- BrokerActor base class — the shared lifecycle and configuration.
- HTTP overview — the separate HTTP server module.
- Event stream — where broker lifecycle events are published.
The BrokerActor API
reference covers the full base-class surface.