Pular para o conteúdo
Português (BR)

ConsumerController

Este conteúdo não está disponível em sua língua ainda.

ConsumerController is the receiver side of reliable delivery. It:

  • Receives Delivery<T> envelopes from one or more ProducerControllers.
  • Dedups duplicates (same producerId + seq).
  • Invokes a user handler function with each message body.
  • Acks back to the producer automatically after the handler resolves; if the handler throws, no ack — the producer retransmits.
import { Props, ConsumerController } from 'actor-ts';
const consumer = system.spawn(
Props.create(() => new ConsumerController<OrderEvent>({
handler: async (order) => {
await processOrder(order);
},
})),
);

The producer’s messages flow:

auto-Ack on resolve

ProducerController

ConsumerController

handler(body)

interface ConsumerControllerSettings<T> {
handler: (body: T) => void | Promise<void>;
}

The only setting is the handler. Per-producer dedup state is managed internally — you don’t configure a buffer size or consumer ID.

The controller calls handler(body) once per new (producerId, seq) pair. After the handler resolves, the controller sends an Ack back to the producer (via Delivery.replyTo) and the producer releases its in-flight slot.

new ConsumerController<OrderEvent>({
handler: async (order) => {
await processOrder(order);
// Ack is sent automatically after this returns.
},
});

Handler throws or rejects → no Ack → producer retransmits

Section titled “Handler throws or rejects → no Ack → producer retransmits”
new ConsumerController<OrderEvent>({
handler: async (order) => {
if (!isValid(order)) {
throw new Error('invalid order');
}
await processOrder(order);
},
});

Throwing (or returning a rejected promise) prevents the Ack. The producer’s resendTimeoutMs timer fires and re-sends the same seq. This is the only “nack” mechanism — there is no explicit nack message.

When the same (producerId, seq) arrives again (e.g. the producer retransmitted before our Ack reached it), the controller skips the handler and re-sends the Ack. No application code needed — the controller tracks contiguous + out-of-order seqs per producer internally.

For business-level idempotency (e.g. “don’t charge a credit card twice even across consumer restarts”), persist your own processed-seq alongside the business state — the controller’s in-memory dedup is lost on restart.

Producer sends 1, 2, 3
Network delivers them as: 1, 3, 2
ConsumerController tracks { contiguous: 1, above: {} } at start
After 3 arrives → { contiguous: 1, above: {3} } (handler ran for seq 3)
After 2 arrives → { contiguous: 3, above: {} } (handler ran for seq 2,
then contiguous slides to 3)

The controller keeps a contiguous high-watermark plus a set of out-of-order seqs already delivered above it. Every newly arrived seq is checked against both — duplicates skip the handler and re-Ack; new seqs run the handler then update state.

There is no fixed buffer size and no silent drop — the producer keeps retransmitting unacked messages until the consumer’s handler resolves and the Ack lands.

Consumer crashes during the handler:
- Handler never resolved → no Ack was sent
- Producer retransmits after `resendTimeoutMs`
- New consumer (or restarted same consumer) sees the seq again
- Controller's in-memory dedup state is gone, so the handler runs again

At-least-once delivery; idempotency is your responsibility. For consumer crashes that must not re-process work, persist the processed-seq alongside business state in your handler.

const consumer = system.spawn(
Props.create(() => new ConsumerController<OrderEvent>({
handler: async (order) => { await processOrder(order); },
})),
);
// Multiple producers send to the same consumer:
const p1 = system.spawnAnonymous(Props.create(() =>
new ProducerController({ consumer, producerId: 'p1' })));
const p2 = system.spawnAnonymous(Props.create(() =>
new ProducerController({ consumer, producerId: 'p2' })));

Each producer maintains its own seq. The consumer dedups per producer automatically — it keeps a separate (contiguous, above) dedup state per producerId, so two producers’ seq spaces don’t collide.

Your handler sees only the body — producerId and seq are the controller’s concern. If you need them for business logic, persist a wrapper that carries them through your processing pipeline.

The producer’s consumer ref can point at a remote consumer (different cluster node). The cluster transport serialises the envelopes; the Ack path uses the same transport in reverse.

In practice: place the consumer near its handler’s data — if the handler hits a local database, run the consumer on the same node as the database. Cross-cluster delivery is fine for throughput-bounded workloads but adds a round-trip per ack.