コンテンツにスキップ
日本語

I/O overview

このコンテンツはまだ日本語訳がありません。

Actors handle in-process work cleanly; the outside world is messier — sockets connect and disconnect, brokers go down, HTTP clients pile up, payloads cross protocol boundaries.

The framework’s I/O modules wrap each protocol in an actor that hides the protocol behind a uniform contract: connect on preStart, publish events to subscribers when messages arrive, buffer outgoing messages while disconnected, reconnect with backoff, surface lifecycle events on the event stream.

extends

extends

extends

extends

BrokerActor (base class)

• Lifecycle state machine

disconnected → connecting → connected → disconnecting

• Outbound buffer (bounded)

• Reconnect with backoff

• Subscriber fan-out

• Lifecycle events on system.eventStream

KafkaActor

MqttActor

NatsActor

...et al.

Kafka wire

MQTT wire

NATS wire

Subclasses implement three hooks (connectImpl, disconnectImpl, dispatchOutgoing); the base handles the rest.

ModuleProtocolCommon use
KafkaActorApache KafkaHigh-throughput durable log; consumer-group fan-out.
MqttActorMQTT 3.1.1 / 5IoT, telemetry; QoS 0/1/2 + retained messages.
AmqpActorRabbitMQ / AMQP 0.9.1Topic / queue routing, work queues.
NatsActorNATS coreLightweight pub/sub; request-reply.
JetStreamActorNATS JetStreamDurable streaming on NATS.
RedisStreamsActorRedis XADD/XREADLightweight streaming + consumer groups.
GrpcClientActor / GrpcServerActorgRPCTyped RPC + streaming.
WebSocketActor (client)WebSocketOutgoing connection.
ServerWebSocketActorWebSocketIncoming connections (server side).
SseActorServer-Sent EventsOne-way streaming from server.
TcpSocketActorRaw TCPCustom protocols.
UdpSocketActorRaw UDPTelemetry, multicast, low-latency.

Each has its own page in this section. This overview covers the common pattern they all share.

disconnected

connecting

connected

disconnecting

Linear transitions during normal operation:

  • disconnected — initial state; not currently connected.
  • connectingconnectImpl is running.
  • connected — connection is up; messages flow.
  • disconnectingdisconnectImpl is running.

Failure during connected triggers a transition to disconnected

  • a reconnect loop. Stop transitions through disconnecting from any state.

Subscribe to the lifecycle events via the event stream:

import { BrokerConnected, BrokerDisconnected, BrokerReconnectAttempt } from 'actor-ts';
class Monitor extends Actor<BrokerConnected | BrokerDisconnected | BrokerReconnectAttempt> {
override preStart(): void {
this.system.eventStream.subscribe(this.self, BrokerConnected);
this.system.eventStream.subscribe(this.self, BrokerDisconnected);
this.system.eventStream.subscribe(this.self, BrokerReconnectAttempt);
}
override onReceive(event: BrokerConnected | BrokerDisconnected | BrokerReconnectAttempt): void {
this.log.info(`broker event: ${event.constructor.name}`);
}
}

The event payload includes which broker actor emitted it (by actorPath), so a single monitor can watch all brokers in the system.

When a connected broker disconnects unexpectedly, the base class schedules a reconnect with exponential backoff. Configurable via BrokerCommonSettings:

{
reconnect: {
minBackoffMs: 500,
maxBackoffMs: 30_000,
randomFactor: 0.2,
maxAttempts: -1, // -1 = unlimited
}
}

Each attempt fires a BrokerReconnectAttempt event. When maxAttempts is exceeded (if finite), the actor surfaces BrokerReconnectFailed and stays disconnected; messages buffered in the outbound queue are eventually dropped per the buffer policy.

While disconnected, outgoing messages don’t fail immediately — they’re enqueued in a bounded outbound buffer. When the connection restores, the buffer is drained.

Configurable size + overflow policy:

{
outboundBuffer: {
capacity: 1_000,
overflow: 'drop-oldest', // or 'drop-new' or 'reject'
}
}

When the buffer overflows, the base publishes a BrokerBufferOverflow event so you can wire it into metrics.

For inbound messages (e.g., a Kafka message landing on a subscribed topic), the broker actor publishes to subscribers — actor refs registered as interested:

const kafka = system.spawn(
Props.create(() => new KafkaActor({
bootstrapServers: 'localhost:9092',
consumer: { groupId: 'my-app', topics: ['orders'] },
})),
);
const handler = system.spawnAnonymous(Props.create(() => new OrderHandler()));
kafka.tell({ kind: 'subscribe', subscriber: handler });
// Now every inbound order is told to `handler`.

Different broker actors have different subscription semantics (a single topic for MQTT, a consumer-group + topic list for Kafka), but the pattern is consistent: register a subscriber ref, receive the protocol’s inbound messages as actor messages.

Three layers, highest-first:

  1. Constructor argument — what you passed when spawning the actor.
  2. HOCON config under a protocol-specific key (e.g. actor-ts.io.kafka { bootstrap-servers = "..." }).
  3. Built-in defaults — sensible production defaults the actor ships with.

This means you can set system-wide defaults in application.conf and override per-instance via the constructor. Per-broker subpages spell out which keys each protocol reads.

Three good fits:

  1. Bridging an external broker into the actor world — Kafka, MQTT, NATS messages need to become actor messages and vice versa. Don’t build the protocol yourself; use the provided actors.
  2. Long-lived connections to a service that pushes data — gRPC streams, WebSocket clients, SSE consumers. The broker actor manages reconnect and buffering.
  3. Mutually-exclusive resource ownership — exactly one TCP connection to a legacy service. The broker actor model serializes access naturally.

Implement the three abstract methods and wire up settings:

import { BrokerActor, type BrokerCommonSettings } from 'actor-ts';
class MyProtocolActor extends BrokerActor<MyOutbound> {
protected configKey() { return 'actor-ts.io.my-protocol'; }
protected builtInDefaults(): BrokerCommonSettings { return { /* defaults */ }; }
protected readSettingsFromConfig(c) { /* parse HOCON */ }
protected requiredSettings() { return ['url']; }
protected async connectImpl(): Promise<void> {
// Establish the connection. Throw on failure.
}
protected async disconnectImpl(): Promise<void> {
// Clean up the connection.
}
protected async dispatchOutgoing(env: OutboundEnvelope<MyOutbound>): Promise<void> {
// Send the payload over the connection.
}
}

The base class handles state transitions, buffering, reconnect, event publishing. Subclasses only own the protocol-specific parts.

The BrokerActor API reference covers the full base-class surface.