Aller au contenu
Français

ProducerController

Ce contenu n’est pas encore disponible dans votre langue.

ProducerController wraps a sender’s outgoing messages, assigning sequence numbers + holding them until acked by the consumer.

import { Props, ProducerController } from 'actor-ts';
const producer = system.spawn(
Props.create(() => new ProducerController<OrderEvent>({
producerId: 'orders', // stable identity
consumer: consumerRef,
windowSize: 16, // flow-control window
resendTimeoutMs: 500,
})),
'producer',
);
producer.tell({ kind: 'reliable-delivery.send', body: { orderId: 'o-1' } });

Internally:

  • Assigns seq 1 to the first message, 2 to the next, …
  • Sends each via wrapped tell to the consumer.
  • Holds in a buffer until acked.
  • Retransmits if no ack within resendTimeoutMs.
interface ProducerControllerSettings<T> {
consumer: ActorRef<Delivery<T>>;
resendTimeoutMs?: number; // default 500
windowSize?: number; // default 16
producerId?: string; // auto-generated if omitted
}
FieldPurpose
consumerThe ConsumerController ref to deliver to.
resendTimeoutMsHow long to wait for an ack before retransmitting.
windowSizeFlow-control window — pauses queueing after N unacked messages in flight.
producerIdStable identifier used by the consumer to dedup across restarts. Auto-generated if omitted, but for persisted producers you’ll want a stable string so recovery matches up.
windowSize: 16

The producer holds up to 16 unacked messages in flight. Beyond that, incoming { kind: 'reliable-delivery.send' } messages are queued — the caller’s tell succeeds, but actual sending pauses until the consumer’s acks free up the buffer.

For senders that need to know about pressure:

const result = await producer.ask({ kind: 'reliable-delivery.send', body, replyTo: ... }, 30_000);
// ^ replies after the message is at least sent

ask-style sends wait for the producer’s “I sent it” reply, giving you a natural backpressure signal.

When an ack doesn’t arrive within resendTimeoutMs:

seq 5 sent at t=0
seq 5 ack never arrives
at t=500 → retransmit seq 5
at t=1000 → retransmit seq 5
... until ack arrives or producer stops

Retransmits use the same seq as the original. The consumer dedups based on the seq.

// Without persistence:
producer crashes → buffer lost → unacked messages are gone
// On restart, send resumes from seq 1 (no resume)
// With ProducerController + PersistentActor wrapper:
producer crashes → recovery rebuilds buffer + last-acked-seq
// On restart, send resumes from the right seq

For full durability, wrap the producer in a PersistentActor pattern — persist outgoing messages before sending; on restart, replay any unacked.

The framework’s plain ProducerController is in-memory — sufficient for ephemeral streams. For durable streams, layer persistence on top.

// One producer per consumer:
const p1 = system.spawnAnonymous(Props.create(() => new ProducerController({ consumer: c1Ref, ... })));
const p2 = system.spawnAnonymous(Props.create(() => new ProducerController({ consumer: c2Ref, ... })));

Each ProducerController is 1:1 with one ConsumerController. For fan-out, multiple producers send to different consumers.

For routing-based fan-out (one logical stream split across N consumers), you’d write a custom router on top.

ProducerController’s consumer ref can point at a remote consumer (different cluster node). The cluster transport serializes envelopes; retransmissions work the same.

The producer holds the buffer locally — on producer-host crash, those messages are lost unless persisted.

ref.tell(msg): ProducerController:
- No seq, no ack, no retransmit - Reliable delivery contract
- Lost on dead recipient - Survives transient failures
- Sub-microsecond cost - ~50µs per message overhead

Use the controller only for streams where loss is unacceptable; raw tell for everything else.