ConsumerController
Ce contenu n’est pas encore disponible dans votre langue.
ConsumerController is the receiver side of reliable delivery.
It:
- Receives
Delivery<T>envelopes from one or moreProducerControllers. - Dedups duplicates (same
producerId+seq). - Invokes a user handler function with each message body.
- Acks back to the producer automatically after the handler resolves; if the handler throws, no ack — the producer retransmits.
import { Props, ConsumerController } from 'actor-ts';
const consumer = system.spawn( Props.create(() => new ConsumerController<OrderEvent>({ handler: async (order) => { await processOrder(order); }, })),);The producer’s messages flow:
Configuration
Section titled “Configuration”interface ConsumerControllerSettings<T> { handler: (body: T) => void | Promise<void>;}The only setting is the handler. Per-producer dedup state is managed internally — you don’t configure a buffer size or consumer ID.
Handler contract
Section titled “Handler contract”The controller calls handler(body) once per new (producerId, seq) pair. After the handler resolves, the controller sends an
Ack back to the producer (via Delivery.replyTo) and the producer
releases its in-flight slot.
new ConsumerController<OrderEvent>({ handler: async (order) => { await processOrder(order); // Ack is sent automatically after this returns. },});Handler throws or rejects → no Ack → producer retransmits
Section titled “Handler throws or rejects → no Ack → producer retransmits”new ConsumerController<OrderEvent>({ handler: async (order) => { if (!isValid(order)) { throw new Error('invalid order'); } await processOrder(order); },});Throwing (or returning a rejected promise) prevents the Ack.
The producer’s resendTimeoutMs timer fires and re-sends the
same seq. This is the only “nack” mechanism — there is no
explicit nack message.
Duplicate handling is automatic
Section titled “Duplicate handling is automatic”When the same (producerId, seq) arrives again (e.g. the producer
retransmitted before our Ack reached it), the controller skips the
handler and re-sends the Ack. No application code needed — the
controller tracks contiguous + out-of-order seqs per producer
internally.
For business-level idempotency (e.g. “don’t charge a credit card twice even across consumer restarts”), persist your own processed-seq alongside the business state — the controller’s in-memory dedup is lost on restart.
Out-of-order handling
Section titled “Out-of-order handling”Producer sends 1, 2, 3Network delivers them as: 1, 3, 2ConsumerController tracks { contiguous: 1, above: {} } at startAfter 3 arrives → { contiguous: 1, above: {3} } (handler ran for seq 3)After 2 arrives → { contiguous: 3, above: {} } (handler ran for seq 2, then contiguous slides to 3)The controller keeps a contiguous high-watermark plus a set of
out-of-order seqs already delivered above it. Every newly
arrived seq is checked against both — duplicates skip the
handler and re-Ack; new seqs run the handler then update state.
There is no fixed buffer size and no silent drop — the producer keeps retransmitting unacked messages until the consumer’s handler resolves and the Ack lands.
Failure semantics
Section titled “Failure semantics”Consumer crashes during the handler: - Handler never resolved → no Ack was sent - Producer retransmits after `resendTimeoutMs` - New consumer (or restarted same consumer) sees the seq again - Controller's in-memory dedup state is gone, so the handler runs againAt-least-once delivery; idempotency is your responsibility. For consumer crashes that must not re-process work, persist the processed-seq alongside business state in your handler.
Multiple producers
Section titled “Multiple producers”const consumer = system.spawn( Props.create(() => new ConsumerController<OrderEvent>({ handler: async (order) => { await processOrder(order); }, })),);
// Multiple producers send to the same consumer:const p1 = system.spawnAnonymous(Props.create(() => new ProducerController({ consumer, producerId: 'p1' })));const p2 = system.spawnAnonymous(Props.create(() => new ProducerController({ consumer, producerId: 'p2' })));Each producer maintains its own seq. The consumer dedups
per producer automatically — it keeps a separate
(contiguous, above) dedup state per producerId, so two
producers’ seq spaces don’t collide.
Your handler sees only the body — producerId and seq are
the controller’s concern. If you need them for business
logic, persist a wrapper that carries them through your
processing pipeline.
Cluster-aware
Section titled “Cluster-aware”The producer’s consumer ref can point at a remote consumer
(different cluster node). The cluster transport serialises the
envelopes; the Ack path uses the same transport in reverse.
In practice: place the consumer near its handler’s data — if the handler hits a local database, run the consumer on the same node as the database. Cross-cluster delivery is fine for throughput-bounded workloads but adds a round-trip per ack.
Where to next
Section titled “Where to next”- Delivery overview — the bigger picture.
- ProducerController — the sender side.
- Ack semantics — when acks fire.
- PersistentActor — for persisting processed-seq state if you need effectively-once handling across consumer restarts.