ProducerController
ProducerController umwickelt die ausgehenden Nachrichten eines
Senders, vergibt Sequence Numbers und hält sie, bis sie vom
Consumer geACKt sind.
import { Props, ProducerController } from 'actor-ts';
const producer = system.spawn( Props.create(() => new ProducerController<OrderEvent>({ producerId: 'orders', // stabile Identität consumer: consumerRef, windowSize: 16, // Flow-Control-Window resendTimeoutMs: 500, })), 'producer',);
producer.tell({ kind: 'reliable-delivery.send', body: { orderId: 'o-1' } });Intern:
- Vergibt Seq 1 für die erste Nachricht, 2 für die nächste, …
- Sendet jede via Wrapped Tell an den Consumer.
- Hält sie in einem Buffer, bis sie geACKt sind.
- Retransmittiert, wenn binnen
resendTimeoutMskein ACK kommt.
Konfiguration
Abschnitt betitelt „Konfiguration“interface ProducerControllerSettings<T> { consumer: ActorRef<Delivery<T>>; resendTimeoutMs?: number; // Default 500 windowSize?: number; // Default 16 producerId?: string; // wird auto-generiert, wenn weggelassen}| Feld | Zweck |
|---|---|
consumer | Die ConsumerController-Ref, an die zugestellt wird. |
resendTimeoutMs | Wie lange auf ein ACK gewartet wird, bevor retransmittiert wird. |
windowSize | Flow-Control-Window — pausiert das Queuing nach N unbestätigten In-Flight-Nachrichten. |
producerId | Stabiler Identifier, mit dem der Consumer über Restarts hinweg dedupliziert. Wird auto-generiert, wenn weggelassen; bei persistierten Producern willst Du einen stabilen String, damit Recovery matcht. |
Backpressure
Abschnitt betitelt „Backpressure“windowSize: 16Der Producer hält bis zu 16 unbestätigte Nachrichten in Flight.
Darüber hinaus werden eingehende
{ kind: 'reliable-delivery.send' }-Nachrichten
eingereiht — das tell des Callers ist erfolgreich, aber das
eigentliche Senden pausiert, bis ACKs den Slot freigeben.
Für Sender, die wissen müssen, wann Druck herrscht:
const result = await producer.ask({ kind: 'reliable-delivery.send', body, replyTo: ... }, 30_000);// ^ antwortet, nachdem die Nachricht mindestens gesendet wurdeask-Sends warten auf die “Ich habe sie gesendet”-Antwort des
Producers und geben dir damit ein natürliches Backpressure-Signal.
Retransmission
Abschnitt betitelt „Retransmission“Wenn ein ACK nicht binnen resendTimeoutMs eintrifft:
Seq 5 zum Zeitpunkt t=0 gesendetACK für Seq 5 kommt niebei t=500 → Seq 5 retransmittenbei t=1000 → Seq 5 retransmitten... bis das ACK kommt oder der Producer stopptRetransmits nutzen dieselbe Seq wie das Original. Der Consumer dedupliziert anhand der Seq.
State + Restart
Abschnitt betitelt „State + Restart“// Ohne Persistenz:Producer crasht → Buffer weg → unbestätigte Nachrichten sind verloren// Beim Restart setzt das Senden bei Seq 1 wieder ein (kein Resume)
// Mit ProducerController + PersistentActor-Wrapper:Producer crasht → Recovery baut Buffer + last-acked-Seq wieder auf// Beim Restart setzt das Senden bei der richtigen Seq fortFür volle Durability den Producer in ein
PersistentActor-Muster einwickeln — ausgehende Nachrichten
persistieren, bevor gesendet wird; beim Restart unbestätigte
neu abspielen.
Das einfache ProducerController des Frameworks ist in-memory
— ausreichend für ephemere Streams. Für durable Streams legst
du Persistenz darauf.
Mehrere Consumer
Abschnitt betitelt „Mehrere Consumer“// Ein Producer pro Consumer:const p1 = system.spawnAnonymous(Props.create(() => new ProducerController({ consumer: c1Ref, ... })));const p2 = system.spawnAnonymous(Props.create(() => new ProducerController({ consumer: c2Ref, ... })));Jeder ProducerController ist 1:1 mit einem ConsumerController. Für Fan-out senden mehrere Producer an verschiedene Consumer.
Für routing-basiertes Fan-out (ein logischer Stream auf N Consumer aufgeteilt) würdest du einen eigenen Router darüber schreiben.
Cluster-aware
Abschnitt betitelt „Cluster-aware“Die consumer-Ref des ProducerController kann auf einen
Remote-Consumer zeigen (anderer Cluster-Node). Der
Cluster-Transport serialisiert Envelopes; Retransmissions
funktionieren gleich.
Der Producer hält den Buffer lokal — bei einem Crash des Producer-Hosts sind diese Nachrichten verloren, sofern nicht persistiert.
Vergleich mit rohem tell
Abschnitt betitelt „Vergleich mit rohem tell“ref.tell(msg): ProducerController:- Keine Seq, kein ACK, kein Retransmit - Vertrag zuverlässige Zustellung- Verloren bei totem Empfänger - Übersteht transiente Fehler- Sub-Mikrosekunden-Kosten - ~50µs Overhead pro NachrichtNimm den Controller nur für Streams, in denen Verlust inakzeptabel
ist; rohes tell für alles andere.
Wohin als Nächstes
Abschnitt betitelt „Wohin als Nächstes“- Delivery im Überblick — das Gesamtbild.
- ConsumerController — die Empfänger-Seite.
- ACK-Semantik — wann ACKs feuern und was sie bedeuten.
- PersistentActor — für durable Producer-State.