ConsumerController
ConsumerController is the receiver side of reliable delivery.
It:
- Receives messages from one or more
ProducerControllers. - Orders by sequence number.
- Dedupes duplicates (same seq).
- Delivers in order to a delegate actor.
- Sends acks back to the producer after processing.
import { ConsumerController } from 'actor-ts';
class OrderProcessor extends Actor<DeliveryEnvelope<OrderEvent>> { override async onReceive(msg: DeliveryEnvelope<OrderEvent>): Promise<void> { await this.process(msg.payload); msg.ack(); // acknowledge to the producer }}
const processor = system.actorOf(Props.create(() => new OrderProcessor()));
const consumer = system.actorOf( ConsumerController.props<OrderEvent>({ consumerId: 'order-consumer', delegate: processor, }),);The producer’s messages flow:
ProducerController → ConsumerController → Order in seq → OrderProcessor.onReceive │ ack ◄──────────────────┤ (back to producer)Configuration
Section titled “Configuration”interface ConsumerControllerSettings<T> { consumerId: string; delegate: ActorRef<DeliveryEnvelope<T>>; bufferSize?: number; // default 100}| Field | Purpose |
|---|---|
consumerId | Stable identifier — needed for recovery. |
delegate | The actor that does the actual work. Receives DeliveryEnvelope<T>. |
bufferSize | Internal buffer for out-of-order arrivals. |
DeliveryEnvelope
Section titled “DeliveryEnvelope”interface DeliveryEnvelope<T> { readonly payload: T; readonly seq: number; readonly producerId: string; ack(): void; nack(reason?: string): void;}The delegate’s onReceive gets the envelope. Call ack() after
processing succeeds; the producer can then release the buffered
message.
Ack vs nack
Section titled “Ack vs nack”override onReceive(msg: DeliveryEnvelope<OrderEvent>): void { try { this.process(msg.payload); msg.ack(); } catch (e) { msg.nack(`error: ${(e as Error).message}`); // Producer will retransmit }}ack releases the message from the producer’s buffer.
nack tells the producer “I failed; retransmit when ready.”
The framework’s default ConsumerController + ProducerController
treat nack as “wait + retransmit”; some custom setups might
treat it differently.
Idempotent handling
Section titled “Idempotent handling”class IdempotentConsumer extends Actor<DeliveryEnvelope<OrderEvent>> { private highWatermark = 0;
override onReceive(msg: DeliveryEnvelope<OrderEvent>): void { if (msg.seq <= this.highWatermark) { // Already processed; ack to release the producer msg.ack(); return; } this.process(msg.payload); this.highWatermark = msg.seq; msg.ack(); }}Dedup via highWatermark. Combined with persistence of the
watermark, this gives effectively-once processing.
Out-of-order handling
Section titled “Out-of-order handling”Producer sends 1, 2, 3Network delivers them as: 1, 3, 2ConsumerController buffers 3 (waiting for 2)When 2 arrives → deliver 2, then deliver 3The internal buffer holds out-of-order messages until the gap
fills. Bounded by bufferSize.
If gaps don’t fill (producer crashed, message permanently lost):
Held in buffer until bufferSize exceeded→ oldest gap-blocking message is dropped (silently)→ next messages can proceedFor workloads where this matters, rely on the producer retransmit logic — the producer will keep sending unacked messages.
Failure semantics
Section titled “Failure semantics”Consumer crashes during process: - Message wasn't acked - Producer retransmits after timeout - New consumer (or restarted same consumer) processes again - Idempotent handler dedupes via highWatermarkAt-least-once delivery; idempotency is your responsibility.
Multiple producers
Section titled “Multiple producers”const consumer = system.actorOf(ConsumerController.props({ consumerId: 'multi-input', delegate: processor,}));
// Multiple producers send to the same consumer:const p1 = system.actorOf(ProducerController.props({ consumer: consumerRef, producerId: 'p1' }));const p2 = system.actorOf(ProducerController.props({ consumer: consumerRef, producerId: 'p2' }));Each producer maintains its own seq. The consumer dedupes per producer (highWatermark per producerId):
class MultiProducerConsumer extends Actor<DeliveryEnvelope<T>> { private watermarks = new Map<string, number>();
override onReceive(msg: DeliveryEnvelope<T>): void { const hwm = this.watermarks.get(msg.producerId) ?? 0; if (msg.seq <= hwm) { msg.ack(); return; } this.process(msg.payload); this.watermarks.set(msg.producerId, msg.seq); msg.ack(); }}Cluster-aware
Section titled “Cluster-aware”The delegate can be a remote actor. The consumer
controller routes envelopes via the cluster transport;
serialization happens at the cluster boundary.
In practice: keep the consumer + delegate co-located — cross-cluster delegate ref has unnecessary overhead.
Where to next
Section titled “Where to next”- Delivery overview — the bigger picture.
- ProducerController — the sender side.
- Ack semantics — when acks fire.
- PersistentActor — for persisting the highWatermark.