Zum Inhalt springen
Deutsch

ConsumerController

ConsumerController ist die Empfänger-Seite zuverlässiger Zustellung. Er:

  • Empfängt Delivery<T>-Envelopes von einem oder mehreren ProducerControllern.
  • Dedupliziert Duplikate (gleiches producerId + seq).
  • Ruft eine User-Handler-Funktion mit dem Message-Body auf.
  • ACKt automatisch an den Producer zurück, sobald der Handler resolved; wirft der Handler, kein ACK — der Producer retransmittiert.
import { Props, ConsumerController } from 'actor-ts';
const consumer = system.spawn(
Props.create(() => new ConsumerController<OrderEvent>({
handler: async (order) => {
await processOrder(order);
},
})),
);

Die Nachrichten des Producers fließen:

Auto-ACK bei Resolve

ProducerController

ConsumerController

handler(body)

interface ConsumerControllerSettings<T> {
handler: (body: T) => void | Promise<void>;
}

Die einzige Einstellung ist der Handler. Den Dedup-State pro Producer hält der Controller intern — Du konfigurierst weder eine Buffer-Größe noch eine Consumer-ID.

Der Controller ruft handler(body) einmal pro neuem (producerId, seq)-Paar auf. Sobald der Handler resolved, schickt der Controller ein ACK zurück an den Producer (via Delivery.replyTo), und der Producer gibt seinen In-Flight-Slot frei.

new ConsumerController<OrderEvent>({
handler: async (order) => {
await processOrder(order);
// ACK wird automatisch gesendet, sobald das hier zurückkehrt.
},
});

Handler wirft oder rejected → kein ACK → Producer retransmittiert

Abschnitt betitelt „Handler wirft oder rejected → kein ACK → Producer retransmittiert“
new ConsumerController<OrderEvent>({
handler: async (order) => {
if (!isValid(order)) {
throw new Error('invalid order');
}
await processOrder(order);
},
});

Werfen (oder ein gerejectetes Promise zurückgeben) verhindert das ACK. Der resendTimeoutMs-Timer des Producers feuert und schickt dieselbe Seq erneut. Das ist der einzige “NACK”-Mechanismus — es gibt keine explizite NACK-Nachricht.

Wenn dasselbe (producerId, seq) erneut ankommt (z. B. weil der Producer retransmittiert hat, bevor unser ACK ihn erreicht hat), überspringt der Controller den Handler und schickt einfach nochmal das ACK. Kein Applikations-Code nötig — der Controller hält contiguous und Out-of-order-Seqs pro Producer intern fest.

Für fachliche Idempotenz (z. B. “eine Kreditkarte nicht doppelt belasten, auch nicht über Consumer-Neustarts hinweg”) persistiere Deine eigene processed-Seq zusammen mit dem Business-State — der In-Memory-Dedup des Controllers ist nach einem Restart weg.

Producer sendet 1, 2, 3
Netzwerk liefert: 1, 3, 2
ConsumerController hält am Start { contiguous: 1, above: {} }
Nach Ankunft von 3 → { contiguous: 1, above: {3} } (Handler lief für seq 3)
Nach Ankunft von 2 → { contiguous: 3, above: {} } (Handler lief für seq 2,
dann gleitet contiguous bis 3)

Der Controller pflegt einen contiguous-High-Watermark plus eine Menge von Out-of-order-Seqs, die darüber bereits ausgeliefert wurden. Jede neu ankommende Seq wird gegen beides geprüft — Duplikate überspringen den Handler und re-ACKen; neue Seqs führen den Handler aus und updaten danach den State.

Es gibt keine feste Buffer-Größe und keine stillen Drops — der Producer retransmittiert ungeACKte Nachrichten weiter, bis der Handler resolved und das ACK ankommt.

Consumer crasht mitten im Handler:
- Handler resolved nie → kein ACK gesendet
- Producer retransmittiert nach `resendTimeoutMs`
- Neuer Consumer (oder restarteter alter) sieht die Seq erneut
- Der In-Memory-Dedup-State des Controllers ist weg, Handler läuft erneut

at-least-once-Zustellung; Idempotenz liegt in Deiner Verantwortung. Für Consumer-Crashes, die nicht erneut verarbeiten dürfen, persistiere die processed-Seq zusammen mit dem Business-State im Handler.

const consumer = system.spawn(
Props.create(() => new ConsumerController<OrderEvent>({
handler: async (order) => { await processOrder(order); },
})),
);
// Mehrere Producer senden an denselben Consumer:
const p1 = system.spawnAnonymous(Props.create(() =>
new ProducerController({ consumer, producerId: 'p1' })));
const p2 = system.spawnAnonymous(Props.create(() =>
new ProducerController({ consumer, producerId: 'p2' })));

Jeder Producer pflegt seine eigene Seq. Der Consumer dedupliziert pro Producer automatisch — er hält einen separaten (contiguous, above)-Dedup-State pro producerId, sodass die Seq-Räume zweier Producer nicht kollidieren.

Dein Handler sieht nur den Body — producerId und seq sind die Sache des Controllers. Falls Du sie für Business-Logik brauchst, persistiere einen Wrapper, der sie durch Deine Verarbeitungs-Pipeline trägt.

Die consumer-Ref des Producers kann auf einen Remote-Consumer zeigen (anderer Cluster-Node). Der Cluster-Transport serialisiert die Envelopes; der ACK-Pfad nutzt denselben Transport in Gegenrichtung.

In der Praxis: platziere den Consumer nah an den Daten seines Handlers — wenn der Handler eine lokale Datenbank trifft, lass den Consumer auf demselben Node laufen wie die Datenbank. Cross-cluster-Delivery ist für durchsatzgebundene Workloads in Ordnung, fügt aber pro ACK eine Round-Trip hinzu.