Zum Inhalt springen
Deutsch

Transports

Der Cluster-Transport ist die Leitung zwischen Cluster-Nodes — er liefert Gossip-Nachrichten, Heartbeats und Anwendungs-Envelopes (deine tells an Remote-Actors). Zwei Implementierungen liegen dem Framework bei:

TransportVerwendung
TcpTransportProduktion. Echte TCP-Sockets, optional TLS.
InMemoryTransportTests. Schleift Frames durch In-Process-JS-Strukturen — kein Networking.

Beide implementieren dieselbe Transport-Schnittstelle, sodass das Cluster-Verhalten unabhängig vom eingesteckten Transport identisch ist.

interface Transport {
readonly self: NodeAddress;
start(): Promise<void>;
shutdown(): Promise<void>;
setHandler(handler: (from: NodeAddress, msg: WireMessage) => void): void;
send(to: NodeAddress, msg: WireMessage): void;
disconnect(peer: NodeAddress): void;
peers(): NodeAddress[];
}

Kleine Oberfläche — Bootstrap, Senden, Empfangen, Trennen. Der Cluster steckt einen Handler ein und bekommt einen Strom eingehender Wire-Nachrichten samt Absenderadresse.

import { Cluster, TcpTransport } from 'actor-ts';
const cluster = await Cluster.join(system, {
host: '0.0.0.0',
port: 2552,
seeds: ['...'],
// transport defaultet auf TcpTransport — kein expliziter Pass nötig
});

Was er tut:

  • Lauscht auf host:port auf eingehende Verbindungen.
  • Verbindet sich nach Bedarf mit Peers (beim ersten Send oder zu Seeds bei Join-Zeit).
  • Frame-Größen-Cap (Standard 16 MiB) — Frames, die größer sind, werden abgelehnt, um DoS via gefälschtem Längenpräfix zu verhindern.
  • Auto-Reconnect — fällt eine Verbindung mitten im Cluster-Leben aus, wird beim nächsten Send neu verbunden.

TcpTransport redet nicht direkt mit dem OS — er läuft über ein TcpBackend-Interface mit jeweils einer Implementierung pro Runtime:

RuntimeBackendZugrundeliegende API
BunbunTcpBackendBun.listen / Bun.connect
NodenodeTcpBackendnode:net
DenodenoTcpBackendDeno.listen / Deno.connect

Automatisch erkannt über getTcpBackend(). Normalerweise denkst du nicht darüber nach — derselbe TcpTransport läuft auf jeder Runtime.

import { Cluster, TcpTransport } from 'actor-ts';
const transport = new TcpTransport(
NodeAddress.parse('actor-ts://my-app@10.0.0.5:2552'),
system.log,
{
cert: '...', // PEM
key: '...', // PEM
ca: '...', // optionales CA-Bundle
rejectUnauthorized: true,
},
);
await Cluster.join(system, { host, port, seeds, transport });

TLS-gewrapptes TCP, alles-oder-nichts pro Cluster. Siehe Cluster-Sicherheit für das Produktionsrezept.

new TcpTransport(self, log, null, 64 * 1024 * 1024); // 64 MiB max Frame

Überschreibe den Frame-Größen-Cap pro Frame. Standard 16 MiB reicht für typischen Cluster-Traffic (Gossip, Heartbeats, kleine Envelopes). Größere Werte verbessern nicht den allgemeinen Durchsatz — sie sind nur für einzelne große Nachrichten relevant.

import { InMemoryTransport, Cluster } from 'actor-ts';
import { TestKit } from 'actor-ts/testkit';
// Gemeinsamer globaler Bus — jeder Transport mit demselben Bus kann mit den anderen reden
const bus = InMemoryTransport.newBus();
const tk1 = TestKit.create('node-1');
const tk2 = TestKit.create('node-2');
await Cluster.join(tk1.system, {
host: '1', port: 0,
seeds: ['1:0'],
transport: new InMemoryTransport({ self: 'actor-ts://node-1@1:0', bus, log: tk1.system.log }),
});
await Cluster.join(tk2.system, {
host: '2', port: 0,
seeds: ['1:0'],
transport: new InMemoryTransport({ self: 'actor-ts://node-2@2:0', bus, log: tk2.system.log }),
});

Wie es funktioniert:

  • Ein gemeinsamer Bus routet Nachrichten zwischen In-Process-Transports.
  • Jeder Transport registriert sich beim Bus per Adresse.
  • send(to, msg) schlägt den Empfänger im Bus nach und ruft dessen Handler direkt auf — keine Sockets, keine Serialisierung zu Bytes.

Wird von MultiNodeSpec — dem Multi-Node-Test-Harness — verwendet, um Multi-Node-Cluster in einem Prozess hochzufahren.

  • Netzwerkausfälle. Per Default liefert der Bus zuverlässig. Für Fault Injection würdest du den Bus mit Drop-/Delay-/Reorder-Logik erweitern.
  • Latenz. Die Zustellung ist synchron innerhalb einer Event-Loop-Drehung.
  • Serialisierung. Nachrichten werden per Referenz, nicht in Bytes übergeben. Wenn dein Test tatsächlich Serialisierung üben soll (z. B. CBOR-Codec testen), nutze stattdessen TcpTransport über Loopback.

Transport gegen eine andere Drahtverbindung zu implementieren ist selten, aber möglich. Beispiele:

  • WebSocket-Transport — für Cluster-Teilnehmer im Browser (theoretisch; nicht implementiert).
  • MessageChannel-Transport — für Cluster mit Worker-Threads in einem einzelnen OS-Prozess. Wird vom “Worker-Mesh”-Muster genutzt.

Die Schnittstelle ist klein genug, dass eine kompetente Implementierung ~200 Zeilen Code ist; die Schwierigkeit liegt darin, die Framing- und Heartbeat-Semantik zu treffen, die der Cluster erwartet.

const peers = transport.peers(); // aktuell verbundene Adressen

Der Transport legt keine Metriken pro Verbindung direkt offen — nutze die Metrik-Extension des Clusters, um Verbindungszahlen und gesendete/empfangene Bytes pro Peer zu bekommen.

Für tiefergehende Inspektion (spezifische Frame-Inhalte) aktiviere Debug-Logging am System:

const system = ActorSystem.create('my-app', { logLevel: 'debug' });
// Suche nach [tcp-transport] Log-Zeilen

Eine einzige TCP-Verbindung zwischen zwei Nodes trägt:

  • Gossip-Nachrichten — Cluster-Mitgliedschaftsaustausche.
  • Heartbeat-Nachrichten — Failure Detection.
  • Envelope-Nachrichten — deine tells, mit Routing-Info kodiert.
  • Subsystem-Nachrichten — Sharding-Protokoll, PubSub-Gossip, DistributedData-Replikation.

Alles auf denselben TCP-Stream gemultiplext. Es gibt kein Prioritäts-Routing — Heartbeats und dein Bulk-Traffic teilen sich die Leitung. Für die meisten Workloads ist das in Ordnung; für explizite Isolation (Bandbreite für Cluster-Steuerung reservieren) bräuchte man einen eigenen Transport mit Per-Channel-Framing.