Zum Inhalt springen
Deutsch

ProducerController

ProducerController umwickelt die ausgehenden Nachrichten eines Senders, vergibt Sequence Numbers und hält sie, bis sie vom Consumer geACKt sind.

import { Props, ProducerController } from 'actor-ts';
const producer = system.spawn(
Props.create(() => new ProducerController<OrderEvent>({
producerId: 'orders', // stabile Identität
consumer: consumerRef,
windowSize: 16, // Flow-Control-Window
resendTimeoutMs: 500,
})),
'producer',
);
producer.tell({ kind: 'reliable-delivery.send', body: { orderId: 'o-1' } });

Intern:

  • Vergibt Seq 1 für die erste Nachricht, 2 für die nächste, …
  • Sendet jede via Wrapped Tell an den Consumer.
  • Hält sie in einem Buffer, bis sie geACKt sind.
  • Retransmittiert, wenn binnen resendTimeoutMs kein ACK kommt.
interface ProducerControllerSettings<T> {
consumer: ActorRef<Delivery<T>>;
resendTimeoutMs?: number; // Default 500
windowSize?: number; // Default 16
producerId?: string; // wird auto-generiert, wenn weggelassen
}
FeldZweck
consumerDie ConsumerController-Ref, an die zugestellt wird.
resendTimeoutMsWie lange auf ein ACK gewartet wird, bevor retransmittiert wird.
windowSizeFlow-Control-Window — pausiert das Queuing nach N unbestätigten In-Flight-Nachrichten.
producerIdStabiler Identifier, mit dem der Consumer über Restarts hinweg dedupliziert. Wird auto-generiert, wenn weggelassen; bei persistierten Producern willst Du einen stabilen String, damit Recovery matcht.
windowSize: 16

Der Producer hält bis zu 16 unbestätigte Nachrichten in Flight. Darüber hinaus werden eingehende { kind: 'reliable-delivery.send' }-Nachrichten eingereiht — das tell des Callers ist erfolgreich, aber das eigentliche Senden pausiert, bis ACKs den Slot freigeben.

Für Sender, die wissen müssen, wann Druck herrscht:

const result = await producer.ask({ kind: 'reliable-delivery.send', body, replyTo: ... }, 30_000);
// ^ antwortet, nachdem die Nachricht mindestens gesendet wurde

ask-Sends warten auf die “Ich habe sie gesendet”-Antwort des Producers und geben dir damit ein natürliches Backpressure-Signal.

Wenn ein ACK nicht binnen resendTimeoutMs eintrifft:

Seq 5 zum Zeitpunkt t=0 gesendet
ACK für Seq 5 kommt nie
bei t=500 → Seq 5 retransmitten
bei t=1000 → Seq 5 retransmitten
... bis das ACK kommt oder der Producer stoppt

Retransmits nutzen dieselbe Seq wie das Original. Der Consumer dedupliziert anhand der Seq.

// Ohne Persistenz:
Producer crasht → Buffer weg → unbestätigte Nachrichten sind verloren
// Beim Restart setzt das Senden bei Seq 1 wieder ein (kein Resume)
// Mit ProducerController + PersistentActor-Wrapper:
Producer crasht → Recovery baut Buffer + last-acked-Seq wieder auf
// Beim Restart setzt das Senden bei der richtigen Seq fort

Für volle Durability den Producer in ein PersistentActor-Muster einwickeln — ausgehende Nachrichten persistieren, bevor gesendet wird; beim Restart unbestätigte neu abspielen.

Das einfache ProducerController des Frameworks ist in-memory — ausreichend für ephemere Streams. Für durable Streams legst du Persistenz darauf.

// Ein Producer pro Consumer:
const p1 = system.spawnAnonymous(Props.create(() => new ProducerController({ consumer: c1Ref, ... })));
const p2 = system.spawnAnonymous(Props.create(() => new ProducerController({ consumer: c2Ref, ... })));

Jeder ProducerController ist 1:1 mit einem ConsumerController. Für Fan-out senden mehrere Producer an verschiedene Consumer.

Für routing-basiertes Fan-out (ein logischer Stream auf N Consumer aufgeteilt) würdest du einen eigenen Router darüber schreiben.

Die consumer-Ref des ProducerController kann auf einen Remote-Consumer zeigen (anderer Cluster-Node). Der Cluster-Transport serialisiert Envelopes; Retransmissions funktionieren gleich.

Der Producer hält den Buffer lokal — bei einem Crash des Producer-Hosts sind diese Nachrichten verloren, sofern nicht persistiert.

ref.tell(msg): ProducerController:
- Keine Seq, kein ACK, kein Retransmit - Vertrag zuverlässige Zustellung
- Verloren bei totem Empfänger - Übersteht transiente Fehler
- Sub-Mikrosekunden-Kosten - ~50µs Overhead pro Nachricht

Nimm den Controller nur für Streams, in denen Verlust inakzeptabel ist; rohes tell für alles andere.