Zum Inhalt springen
Deutsch

I/O im Überblick

Actors erledigen In-Process-Arbeit sauber; die Außenwelt ist chaotischer — Sockets verbinden und trennen sich, Broker fallen aus, HTTP-Clients stapeln sich, Payloads überqueren Protokollgrenzen.

Die I/O-Module des Frameworks verpacken jedes Protokoll in einem Actor, der das Protokoll hinter einem einheitlichen Vertrag verbirgt: Connect bei preStart, Events an Subscriber publishen, wenn Nachrichten eintreffen, ausgehende Nachrichten bei getrennter Verbindung puffern, Reconnect mit Backoff, Lifecycle-Events auf dem Event-Stream sichtbar machen.

extends

extends

extends

extends

BrokerActor (Basisklasse)

• Lifecycle-Zustandsautomat

disconnected → connecting → connected → disconnecting

• Outbound-Buffer (begrenzt)

• Reconnect mit Backoff

• Subscriber-Fan-out

• Lifecycle-Events auf system.eventStream

KafkaActor

MqttActor

NatsActor

...u. a.

Kafka-Wire

MQTT-Wire

NATS-Wire

Subklassen implementieren drei Hooks (connectImpl, disconnectImpl, dispatchOutgoing); den Rest übernimmt die Basisklasse.

ModulProtokollTypischer Einsatz
KafkaActorApache KafkaDurables Log mit hohem Durchsatz; Consumer-Group-Fan-out.
MqttActorMQTT 3.1.1 / 5IoT, Telemetrie; QoS 0/1/2 + Retained Messages.
AmqpActorRabbitMQ / AMQP 0.9.1Topic-/Queue-Routing, Work Queues.
NatsActorNATS coreLeichtgewichtiges Pub/Sub; Request/Reply.
JetStreamActorNATS JetStreamDurables Streaming auf NATS.
RedisStreamsActorRedis XADD/XREADLeichtgewichtiges Streaming + Consumer Groups.
GrpcClientActor / GrpcServerActorgRPCTypisierter RPC + Streaming.
WebSocketActor (Client)WebSocketAusgehende Verbindung.
ServerWebSocketActorWebSocketEingehende Verbindungen (Serverseite).
SseActorServer-Sent EventsEinweg-Streaming vom Server.
TcpSocketActorRaw TCPEigene Protokolle.
UdpSocketActorRaw UDPTelemetrie, Multicast, niedrige Latenz.

Jeder hat seine eigene Seite in diesem Abschnitt. Diese Übersicht behandelt das gemeinsame Muster, das sie alle teilen.

disconnected

connecting

connected

disconnecting

Lineare Übergänge im Normalbetrieb:

  • disconnected — Initialzustand; aktuell nicht verbunden.
  • connectingconnectImpl läuft.
  • connected — Verbindung steht; Nachrichten fließen.
  • disconnectingdisconnectImpl läuft.

Ein Fehler während connected löst einen Übergang nach disconnected

  • eine Reconnect-Schleife aus. Ein Stop läuft aus jedem Zustand über disconnecting.

Subscribiere die Lifecycle-Events über den Event-Stream:

import { BrokerConnected, BrokerDisconnected, BrokerReconnectAttempt } from 'actor-ts';
class Monitor extends Actor<BrokerConnected | BrokerDisconnected | BrokerReconnectAttempt> {
override preStart(): void {
this.system.eventStream.subscribe(this.self, BrokerConnected);
this.system.eventStream.subscribe(this.self, BrokerDisconnected);
this.system.eventStream.subscribe(this.self, BrokerReconnectAttempt);
}
override onReceive(event: BrokerConnected | BrokerDisconnected | BrokerReconnectAttempt): void {
this.log.info(`broker event: ${event.constructor.name}`);
}
}

Das Event-Payload enthält, welcher Broker-Actor es ausgelöst hat (über actorPath), sodass ein einzelner Monitor alle Broker im System beobachten kann.

Wenn ein verbundener Broker unerwartet die Verbindung verliert, plant die Basisklasse einen Reconnect mit exponentiellem Backoff. Konfigurierbar über BrokerCommonSettings:

{
reconnect: {
minBackoffMs: 500,
maxBackoffMs: 30_000,
randomFactor: 0.2,
maxAttempts: -1, // -1 = unbegrenzt
}
}

Jeder Versuch löst ein BrokerReconnectAttempt-Event aus. Wenn maxAttempts überschritten wird (sofern endlich), meldet der Actor BrokerReconnectFailed und bleibt getrennt; gepufferte Nachrichten in der Outbound-Queue werden gemäß Buffer-Policy irgendwann verworfen.

Solange die Verbindung getrennt ist, schlagen ausgehende Nachrichten nicht sofort fehl — sie werden in einem begrenzten Outbound-Buffer eingereiht. Sobald die Verbindung wiederhergestellt ist, wird der Buffer geleert.

Konfigurierbare Größe + Überlauf-Policy:

{
outboundBuffer: {
capacity: 1_000,
overflow: 'drop-oldest', // oder 'drop-new' oder 'reject'
}
}

Wenn der Buffer überläuft, publisht die Basisklasse ein BrokerBufferOverflow-Event, damit du es in Metriken verdrahten kannst.

Für eingehende Nachrichten (z. B. eine Kafka-Nachricht, die auf einem subscribierten Topic landet) publisht der Broker-Actor an Subscriber — Actor-Refs, die sich als interessiert registriert haben:

const kafka = system.spawn(
Props.create(() => new KafkaActor({
bootstrapServers: 'localhost:9092',
consumer: { groupId: 'my-app', topics: ['orders'] },
})),
);
const handler = system.spawnAnonymous(Props.create(() => new OrderHandler()));
kafka.tell({ kind: 'subscribe', subscriber: handler });
// Jede eingehende Order wird jetzt an `handler` gesendet.

Verschiedene Broker-Actors haben unterschiedliche Subscription- Semantiken (ein einzelnes Topic bei MQTT, eine Consumer-Group + Topic- Liste bei Kafka), aber das Muster ist konsistent: Subscriber-Ref registrieren, eingehende Protokollnachrichten als Actor-Nachrichten empfangen.

Drei Ebenen, höchste Priorität zuerst:

  1. Konstruktor-Argument — was du beim Spawnen des Actors übergeben hast.
  2. HOCON-Config unter einem protokollspezifischen Key (z. B. actor-ts.io.kafka { bootstrap-servers = "..." }).
  3. Eingebaute Defaults — vernünftige Produktions-Defaults, die der Actor mitbringt.

Das heißt: Du kannst systemweite Defaults in application.conf setzen und pro Instanz über den Konstruktor überschreiben. Die Unterseiten der einzelnen Broker buchstabieren aus, welche Keys jedes Protokoll liest.

Drei gute Einsatzbereiche:

  1. Einen externen Broker in die Actor-Welt brücken — Kafka-, MQTT-, NATS-Nachrichten müssen zu Actor-Nachrichten werden und umgekehrt. Bau das Protokoll nicht selbst; benutze die mitgelieferten Actors.
  2. Langlebige Verbindungen zu einem Service, der Daten pusht — gRPC-Streams, WebSocket-Clients, SSE-Consumer. Der Broker-Actor übernimmt Reconnect und Buffering.
  3. Sich gegenseitig ausschließender Ressourcenbesitz — genau eine TCP-Verbindung zu einem Legacy-Service. Das Broker-Actor- Modell serialisiert den Zugriff von Natur aus.

Implementiere die drei abstrakten Methoden und verdrahte die Settings:

import { BrokerActor, type BrokerCommonSettings } from 'actor-ts';
class MyProtocolActor extends BrokerActor<MyOutbound> {
protected configKey() { return 'actor-ts.io.my-protocol'; }
protected builtInDefaults(): BrokerCommonSettings { return { /* Defaults */ }; }
protected readSettingsFromConfig(c) { /* HOCON parsen */ }
protected requiredSettings() { return ['url']; }
protected async connectImpl(): Promise<void> {
// Verbindung aufbauen. Bei Fehler werfen.
}
protected async disconnectImpl(): Promise<void> {
// Verbindung aufräumen.
}
protected async dispatchOutgoing(env: OutboundEnvelope<MyOutbound>): Promise<void> {
// Payload über die Verbindung senden.
}
}

Die Basisklasse kümmert sich um Zustandsübergänge, Buffering, Reconnect, Event-Publishing. Subklassen besitzen nur die protokollspezifischen Teile.

Die BrokerActor-API-Referenz deckt die komplette Oberfläche der Basisklasse ab.