Skip to content

ProducerController

ProducerController wraps a sender’s outgoing messages, assigning sequence numbers + holding them until acked by the consumer.

import { ProducerController } from 'actor-ts';
const producer = system.actorOf(
ProducerController.props<OrderEvent>({
producerId: 'orders', // stable identity
consumer: consumerRef,
maxOutstanding: 100, // backpressure cap
retransmitDelayMs: 5_000,
}),
'producer',
);
producer.tell({ kind: 'send', payload: { orderId: 'o-1' } });

Internally:

  • Assigns seq 1 to the first message, 2 to the next, …
  • Sends each via wrapped tell to the consumer.
  • Holds in a buffer until acked.
  • Retransmits if no ack within retransmitDelayMs.
interface ProducerControllerSettings<T> {
producerId: string; // stable identifier
consumer: ActorRef<DeliveryEnvelope<T>>;
maxOutstanding?: number; // default 100
retransmitDelayMs?: number; // default 5000
}
FieldPurpose
producerIdStable identifier across restarts — needed if you persist state to match across recovery.
consumerThe ConsumerController ref.
maxOutstandingBackpressure — pause sending after N unacked messages.
retransmitDelayMsHow long to wait for an ack before retransmitting.
maxOutstanding: 100

The producer holds up to 100 unacked messages. Beyond that, incoming { kind: 'send' } messages are stashed — the caller’s tell succeeds, but actual sending pauses until the consumer’s acks free up the buffer.

For senders that need to know about pressure:

const result = await ask(producer, { kind: 'send', payload, replyTo: ... }, 30_000);
// ^ replies after the message is at least sent

ask-style sends wait for the producer’s “I sent it” reply, giving you a natural backpressure signal.

When an ack doesn’t arrive within retransmitDelayMs:

seq 5 sent at t=0
seq 5 ack never arrives
at t=5000 → retransmit seq 5
at t=10000 → retransmit seq 5
... until ack arrives or producer stops

Retransmits use the same seq as the original. The consumer dedups based on the seq.

// Without persistence:
producer crashes → buffer lost → unacked messages are gone
// On restart, send resumes from seq 1 (no resume)
// With ProducerController + PersistentActor wrapper:
producer crashes → recovery rebuilds buffer + last-acked-seq
// On restart, send resumes from the right seq

For full durability, wrap the producer in a PersistentActor pattern — persist outgoing messages before sending; on restart, replay any unacked.

The framework’s plain ProducerController is in-memory — sufficient for ephemeral streams. For durable streams, layer persistence on top.

// One producer per consumer:
const p1 = system.actorOf(ProducerController.props({ consumer: c1Ref, ... }));
const p2 = system.actorOf(ProducerController.props({ consumer: c2Ref, ... }));

Each ProducerController is 1:1 with one ConsumerController. For fan-out, multiple producers send to different consumers.

For routing-based fan-out (one logical stream split across N consumers), you’d write a custom router on top.

ProducerController’s consumer ref can point at a remote consumer (different cluster node). The cluster transport serializes envelopes; retransmissions work the same.

The producer holds the buffer locally — on producer-host crash, those messages are lost unless persisted.

ref.tell(msg): ProducerController:
- No seq, no ack, no retransmit - Reliable delivery contract
- Lost on dead recipient - Survives transient failures
- Sub-microsecond cost - ~50µs per message overhead

Use the controller only for streams where loss is unacceptable; raw tell for everything else.