ConsumerController
ConsumerController ist die Empfänger-Seite zuverlässiger
Zustellung. Er:
- Empfängt
Delivery<T>-Envelopes von einem oder mehrerenProducerControllern. - Dedupliziert Duplikate (gleiches
producerId+seq). - Ruft eine User-Handler-Funktion mit dem Message-Body auf.
- ACKt automatisch an den Producer zurück, sobald der Handler resolved; wirft der Handler, kein ACK — der Producer retransmittiert.
import { Props, ConsumerController } from 'actor-ts';
const consumer = system.spawn( Props.create(() => new ConsumerController<OrderEvent>({ handler: async (order) => { await processOrder(order); }, })),);Die Nachrichten des Producers fließen:
Konfiguration
Abschnitt betitelt „Konfiguration“interface ConsumerControllerSettings<T> { handler: (body: T) => void | Promise<void>;}Die einzige Einstellung ist der Handler. Den Dedup-State pro Producer hält der Controller intern — Du konfigurierst weder eine Buffer-Größe noch eine Consumer-ID.
Handler-Kontrakt
Abschnitt betitelt „Handler-Kontrakt“Der Controller ruft handler(body) einmal pro neuem
(producerId, seq)-Paar auf. Sobald der Handler resolved,
schickt der Controller ein ACK zurück an den Producer (via
Delivery.replyTo), und der Producer gibt seinen In-Flight-Slot
frei.
new ConsumerController<OrderEvent>({ handler: async (order) => { await processOrder(order); // ACK wird automatisch gesendet, sobald das hier zurückkehrt. },});Handler wirft oder rejected → kein ACK → Producer retransmittiert
Abschnitt betitelt „Handler wirft oder rejected → kein ACK → Producer retransmittiert“new ConsumerController<OrderEvent>({ handler: async (order) => { if (!isValid(order)) { throw new Error('invalid order'); } await processOrder(order); },});Werfen (oder ein gerejectetes Promise zurückgeben) verhindert
das ACK. Der resendTimeoutMs-Timer des Producers feuert und
schickt dieselbe Seq erneut. Das ist der einzige
“NACK”-Mechanismus — es gibt keine explizite NACK-Nachricht.
Duplikat-Behandlung läuft automatisch
Abschnitt betitelt „Duplikat-Behandlung läuft automatisch“Wenn dasselbe (producerId, seq) erneut ankommt (z. B. weil der
Producer retransmittiert hat, bevor unser ACK ihn erreicht hat),
überspringt der Controller den Handler und schickt einfach
nochmal das ACK. Kein Applikations-Code nötig — der Controller
hält contiguous und Out-of-order-Seqs pro Producer intern fest.
Für fachliche Idempotenz (z. B. “eine Kreditkarte nicht doppelt belasten, auch nicht über Consumer-Neustarts hinweg”) persistiere Deine eigene processed-Seq zusammen mit dem Business-State — der In-Memory-Dedup des Controllers ist nach einem Restart weg.
Out-of-order-Behandlung
Abschnitt betitelt „Out-of-order-Behandlung“Producer sendet 1, 2, 3Netzwerk liefert: 1, 3, 2ConsumerController hält am Start { contiguous: 1, above: {} }Nach Ankunft von 3 → { contiguous: 1, above: {3} } (Handler lief für seq 3)Nach Ankunft von 2 → { contiguous: 3, above: {} } (Handler lief für seq 2, dann gleitet contiguous bis 3)Der Controller pflegt einen contiguous-High-Watermark plus
eine Menge von Out-of-order-Seqs, die darüber bereits ausgeliefert
wurden. Jede neu ankommende Seq wird gegen beides geprüft —
Duplikate überspringen den Handler und re-ACKen; neue Seqs
führen den Handler aus und updaten danach den State.
Es gibt keine feste Buffer-Größe und keine stillen Drops — der Producer retransmittiert ungeACKte Nachrichten weiter, bis der Handler resolved und das ACK ankommt.
Failure-Semantik
Abschnitt betitelt „Failure-Semantik“Consumer crasht mitten im Handler: - Handler resolved nie → kein ACK gesendet - Producer retransmittiert nach `resendTimeoutMs` - Neuer Consumer (oder restarteter alter) sieht die Seq erneut - Der In-Memory-Dedup-State des Controllers ist weg, Handler läuft erneutat-least-once-Zustellung; Idempotenz liegt in Deiner Verantwortung. Für Consumer-Crashes, die nicht erneut verarbeiten dürfen, persistiere die processed-Seq zusammen mit dem Business-State im Handler.
Mehrere Producer
Abschnitt betitelt „Mehrere Producer“const consumer = system.spawn( Props.create(() => new ConsumerController<OrderEvent>({ handler: async (order) => { await processOrder(order); }, })),);
// Mehrere Producer senden an denselben Consumer:const p1 = system.spawnAnonymous(Props.create(() => new ProducerController({ consumer, producerId: 'p1' })));const p2 = system.spawnAnonymous(Props.create(() => new ProducerController({ consumer, producerId: 'p2' })));Jeder Producer pflegt seine eigene Seq. Der Consumer
dedupliziert pro Producer automatisch — er hält einen
separaten (contiguous, above)-Dedup-State pro producerId,
sodass die Seq-Räume zweier Producer nicht kollidieren.
Dein Handler sieht nur den Body — producerId und seq sind
die Sache des Controllers. Falls Du sie für Business-Logik
brauchst, persistiere einen Wrapper, der sie durch Deine
Verarbeitungs-Pipeline trägt.
Cluster-aware
Abschnitt betitelt „Cluster-aware“Die consumer-Ref des Producers kann auf einen Remote-Consumer
zeigen (anderer Cluster-Node). Der Cluster-Transport
serialisiert die Envelopes; der ACK-Pfad nutzt denselben
Transport in Gegenrichtung.
In der Praxis: platziere den Consumer nah an den Daten seines Handlers — wenn der Handler eine lokale Datenbank trifft, lass den Consumer auf demselben Node laufen wie die Datenbank. Cross-cluster-Delivery ist für durchsatzgebundene Workloads in Ordnung, fügt aber pro ACK eine Round-Trip hinzu.
Wohin als Nächstes
Abschnitt betitelt „Wohin als Nächstes“- Delivery im Überblick — das Gesamtbild.
- ProducerController — die Sender-Seite.
- ACK-Semantik — wann ACKs feuern.
- PersistentActor — zum Persistieren der processed-Seq, wenn Du Effectively-once-Verarbeitung über Consumer-Restarts hinweg brauchst.