Skip to content

ConsumerController

ConsumerController is the receiver side of reliable delivery. It:

  • Receives messages from one or more ProducerControllers.
  • Orders by sequence number.
  • Dedupes duplicates (same seq).
  • Delivers in order to a delegate actor.
  • Sends acks back to the producer after processing.
import { ConsumerController } from 'actor-ts';
class OrderProcessor extends Actor<DeliveryEnvelope<OrderEvent>> {
override async onReceive(msg: DeliveryEnvelope<OrderEvent>): Promise<void> {
await this.process(msg.payload);
msg.ack(); // acknowledge to the producer
}
}
const processor = system.actorOf(Props.create(() => new OrderProcessor()));
const consumer = system.actorOf(
ConsumerController.props<OrderEvent>({
consumerId: 'order-consumer',
delegate: processor,
}),
);

The producer’s messages flow:

ProducerController → ConsumerController → Order in seq → OrderProcessor.onReceive
ack ◄──────────────────┤
(back to producer)
interface ConsumerControllerSettings<T> {
consumerId: string;
delegate: ActorRef<DeliveryEnvelope<T>>;
bufferSize?: number; // default 100
}
FieldPurpose
consumerIdStable identifier — needed for recovery.
delegateThe actor that does the actual work. Receives DeliveryEnvelope<T>.
bufferSizeInternal buffer for out-of-order arrivals.
interface DeliveryEnvelope<T> {
readonly payload: T;
readonly seq: number;
readonly producerId: string;
ack(): void;
nack(reason?: string): void;
}

The delegate’s onReceive gets the envelope. Call ack() after processing succeeds; the producer can then release the buffered message.

override onReceive(msg: DeliveryEnvelope<OrderEvent>): void {
try {
this.process(msg.payload);
msg.ack();
} catch (e) {
msg.nack(`error: ${(e as Error).message}`);
// Producer will retransmit
}
}

ack releases the message from the producer’s buffer.

nack tells the producer “I failed; retransmit when ready.” The framework’s default ConsumerController + ProducerController treat nack as “wait + retransmit”; some custom setups might treat it differently.

class IdempotentConsumer extends Actor<DeliveryEnvelope<OrderEvent>> {
private highWatermark = 0;
override onReceive(msg: DeliveryEnvelope<OrderEvent>): void {
if (msg.seq <= this.highWatermark) {
// Already processed; ack to release the producer
msg.ack();
return;
}
this.process(msg.payload);
this.highWatermark = msg.seq;
msg.ack();
}
}

Dedup via highWatermark. Combined with persistence of the watermark, this gives effectively-once processing.

Producer sends 1, 2, 3
Network delivers them as: 1, 3, 2
ConsumerController buffers 3 (waiting for 2)
When 2 arrives → deliver 2, then deliver 3

The internal buffer holds out-of-order messages until the gap fills. Bounded by bufferSize.

If gaps don’t fill (producer crashed, message permanently lost):

Held in buffer until bufferSize exceeded
→ oldest gap-blocking message is dropped (silently)
→ next messages can proceed

For workloads where this matters, rely on the producer retransmit logic — the producer will keep sending unacked messages.

Consumer crashes during process:
- Message wasn't acked
- Producer retransmits after timeout
- New consumer (or restarted same consumer) processes again
- Idempotent handler dedupes via highWatermark

At-least-once delivery; idempotency is your responsibility.

const consumer = system.actorOf(ConsumerController.props({
consumerId: 'multi-input',
delegate: processor,
}));
// Multiple producers send to the same consumer:
const p1 = system.actorOf(ProducerController.props({ consumer: consumerRef, producerId: 'p1' }));
const p2 = system.actorOf(ProducerController.props({ consumer: consumerRef, producerId: 'p2' }));

Each producer maintains its own seq. The consumer dedupes per producer (highWatermark per producerId):

class MultiProducerConsumer extends Actor<DeliveryEnvelope<T>> {
private watermarks = new Map<string, number>();
override onReceive(msg: DeliveryEnvelope<T>): void {
const hwm = this.watermarks.get(msg.producerId) ?? 0;
if (msg.seq <= hwm) { msg.ack(); return; }
this.process(msg.payload);
this.watermarks.set(msg.producerId, msg.seq);
msg.ack();
}
}

The delegate can be a remote actor. The consumer controller routes envelopes via the cluster transport; serialization happens at the cluster boundary.

In practice: keep the consumer + delegate co-located — cross-cluster delegate ref has unnecessary overhead.