I/O im Überblick
Actors erledigen In-Process-Arbeit sauber; die Außenwelt ist chaotischer — Sockets verbinden und trennen sich, Broker fallen aus, HTTP-Clients stapeln sich, Payloads überqueren Protokollgrenzen.
Die I/O-Module des Frameworks verpacken jedes Protokoll in einem
Actor, der das Protokoll hinter einem einheitlichen Vertrag
verbirgt: Connect bei preStart, Events an Subscriber publishen,
wenn Nachrichten eintreffen, ausgehende Nachrichten bei getrennter
Verbindung puffern, Reconnect mit Backoff, Lifecycle-Events auf dem
Event-Stream sichtbar machen.
Subklassen implementieren drei Hooks (connectImpl, disconnectImpl,
dispatchOutgoing); den Rest übernimmt die Basisklasse.
Die verfügbaren Broker
Abschnitt betitelt „Die verfügbaren Broker“| Modul | Protokoll | Typischer Einsatz |
|---|---|---|
KafkaActor | Apache Kafka | Durables Log mit hohem Durchsatz; Consumer-Group-Fan-out. |
MqttActor | MQTT 3.1.1 / 5 | IoT, Telemetrie; QoS 0/1/2 + Retained Messages. |
AmqpActor | RabbitMQ / AMQP 0.9.1 | Topic-/Queue-Routing, Work Queues. |
NatsActor | NATS core | Leichtgewichtiges Pub/Sub; Request/Reply. |
JetStreamActor | NATS JetStream | Durables Streaming auf NATS. |
RedisStreamsActor | Redis XADD/XREAD | Leichtgewichtiges Streaming + Consumer Groups. |
GrpcClientActor / GrpcServerActor | gRPC | Typisierter RPC + Streaming. |
WebSocketActor (Client) | WebSocket | Ausgehende Verbindung. |
ServerWebSocketActor | WebSocket | Eingehende Verbindungen (Serverseite). |
SseActor | Server-Sent Events | Einweg-Streaming vom Server. |
TcpSocketActor | Raw TCP | Eigene Protokolle. |
UdpSocketActor | Raw UDP | Telemetrie, Multicast, niedrige Latenz. |
Jeder hat seine eigene Seite in diesem Abschnitt. Diese Übersicht behandelt das gemeinsame Muster, das sie alle teilen.
Der Lifecycle-Zustandsautomat
Abschnitt betitelt „Der Lifecycle-Zustandsautomat“Lineare Übergänge im Normalbetrieb:
disconnected— Initialzustand; aktuell nicht verbunden.connecting—connectImplläuft.connected— Verbindung steht; Nachrichten fließen.disconnecting—disconnectImplläuft.
Ein Fehler während connected löst einen Übergang nach disconnected
- eine Reconnect-Schleife aus. Ein Stop läuft aus jedem Zustand über
disconnecting.
Subscribiere die Lifecycle-Events über den Event-Stream:
import { BrokerConnected, BrokerDisconnected, BrokerReconnectAttempt } from 'actor-ts';
class Monitor extends Actor<BrokerConnected | BrokerDisconnected | BrokerReconnectAttempt> { override preStart(): void { this.system.eventStream.subscribe(this.self, BrokerConnected); this.system.eventStream.subscribe(this.self, BrokerDisconnected); this.system.eventStream.subscribe(this.self, BrokerReconnectAttempt); } override onReceive(event: BrokerConnected | BrokerDisconnected | BrokerReconnectAttempt): void { this.log.info(`broker event: ${event.constructor.name}`); }}Das Event-Payload enthält, welcher Broker-Actor es ausgelöst hat
(über actorPath), sodass ein einzelner Monitor alle Broker im
System beobachten kann.
Reconnect mit Backoff
Abschnitt betitelt „Reconnect mit Backoff“Wenn ein verbundener Broker unerwartet die Verbindung verliert,
plant die Basisklasse einen Reconnect mit exponentiellem Backoff.
Konfigurierbar über BrokerCommonSettings:
{ reconnect: { minBackoffMs: 500, maxBackoffMs: 30_000, randomFactor: 0.2, maxAttempts: -1, // -1 = unbegrenzt }}Jeder Versuch löst ein BrokerReconnectAttempt-Event aus. Wenn
maxAttempts überschritten wird (sofern endlich), meldet der Actor
BrokerReconnectFailed und bleibt getrennt; gepufferte Nachrichten
in der Outbound-Queue werden gemäß Buffer-Policy irgendwann
verworfen.
Outbound-Buffer
Abschnitt betitelt „Outbound-Buffer“Solange die Verbindung getrennt ist, schlagen ausgehende Nachrichten nicht sofort fehl — sie werden in einem begrenzten Outbound-Buffer eingereiht. Sobald die Verbindung wiederhergestellt ist, wird der Buffer geleert.
Konfigurierbare Größe + Überlauf-Policy:
{ outboundBuffer: { capacity: 1_000, overflow: 'drop-oldest', // oder 'drop-new' oder 'reject' }}Wenn der Buffer überläuft, publisht die Basisklasse ein
BrokerBufferOverflow-Event, damit du es in Metriken verdrahten
kannst.
Subscriber-Fan-out
Abschnitt betitelt „Subscriber-Fan-out“Für eingehende Nachrichten (z. B. eine Kafka-Nachricht, die auf einem subscribierten Topic landet) publisht der Broker-Actor an Subscriber — Actor-Refs, die sich als interessiert registriert haben:
const kafka = system.spawn( Props.create(() => new KafkaActor({ bootstrapServers: 'localhost:9092', consumer: { groupId: 'my-app', topics: ['orders'] }, })),);
const handler = system.spawnAnonymous(Props.create(() => new OrderHandler()));
kafka.tell({ kind: 'subscribe', subscriber: handler });// Jede eingehende Order wird jetzt an `handler` gesendet.Verschiedene Broker-Actors haben unterschiedliche Subscription- Semantiken (ein einzelnes Topic bei MQTT, eine Consumer-Group + Topic- Liste bei Kafka), aber das Muster ist konsistent: Subscriber-Ref registrieren, eingehende Protokollnachrichten als Actor-Nachrichten empfangen.
Reihenfolge der Settings
Abschnitt betitelt „Reihenfolge der Settings“Drei Ebenen, höchste Priorität zuerst:
- Konstruktor-Argument — was du beim Spawnen des Actors übergeben hast.
- HOCON-Config unter einem protokollspezifischen Key (z. B.
actor-ts.io.kafka { bootstrap-servers = "..." }). - Eingebaute Defaults — vernünftige Produktions-Defaults, die der Actor mitbringt.
Das heißt: Du kannst systemweite Defaults in application.conf
setzen und pro Instanz über den Konstruktor überschreiben. Die
Unterseiten der einzelnen Broker buchstabieren aus, welche Keys
jedes Protokoll liest.
Wann ein Broker-Actor passt
Abschnitt betitelt „Wann ein Broker-Actor passt“Drei gute Einsatzbereiche:
- Einen externen Broker in die Actor-Welt brücken — Kafka-, MQTT-, NATS-Nachrichten müssen zu Actor-Nachrichten werden und umgekehrt. Bau das Protokoll nicht selbst; benutze die mitgelieferten Actors.
- Langlebige Verbindungen zu einem Service, der Daten pusht — gRPC-Streams, WebSocket-Clients, SSE-Consumer. Der Broker-Actor übernimmt Reconnect und Buffering.
- Sich gegenseitig ausschließender Ressourcenbesitz — genau eine TCP-Verbindung zu einem Legacy-Service. Das Broker-Actor- Modell serialisiert den Zugriff von Natur aus.
Was kein Broker-Actor ist
Abschnitt betitelt „Was kein Broker-Actor ist“Einen eigenen Broker-Actor schreiben
Abschnitt betitelt „Einen eigenen Broker-Actor schreiben“Implementiere die drei abstrakten Methoden und verdrahte die Settings:
import { BrokerActor, type BrokerCommonSettings } from 'actor-ts';
class MyProtocolActor extends BrokerActor<MyOutbound> { protected configKey() { return 'actor-ts.io.my-protocol'; } protected builtInDefaults(): BrokerCommonSettings { return { /* Defaults */ }; } protected readSettingsFromConfig(c) { /* HOCON parsen */ } protected requiredSettings() { return ['url']; }
protected async connectImpl(): Promise<void> { // Verbindung aufbauen. Bei Fehler werfen. } protected async disconnectImpl(): Promise<void> { // Verbindung aufräumen. } protected async dispatchOutgoing(env: OutboundEnvelope<MyOutbound>): Promise<void> { // Payload über die Verbindung senden. }}Die Basisklasse kümmert sich um Zustandsübergänge, Buffering, Reconnect, Event-Publishing. Subklassen besitzen nur die protokollspezifischen Teile.
Wohin als Nächstes
Abschnitt betitelt „Wohin als Nächstes“- Seiten pro Protokoll — Kafka, MQTT, AMQP, NATS, Redis Streams, gRPC, WebSocket-Client, WebSocket-Server, SSE, TCP, UDP.
- BrokerActor-Basisklasse — der gemeinsame Lifecycle und die gemeinsame Konfiguration.
- HTTP-Übersicht — das separate HTTP-Server-Modul.
- Event-Stream — wo Broker-Lifecycle-Events publisht werden.
Die BrokerActor-API-Referenz deckt
die komplette Oberfläche der Basisklasse ab.