AMQP (RabbitMQ)
AmqpActor integrates with RabbitMQ (and other AMQP 0.9.1
brokers). Supports exchanges (direct / topic / fanout / headers),
queues, routing keys, ack/nack semantics.
import { ActorSystem, Props, AmqpActor } from 'actor-ts';
const amqp = system.actorOf( Props.create(() => new AmqpActor({ url: 'amqp://guest:guest@rabbitmq:5672', exchanges: [ { name: 'orders', type: 'topic', durable: true }, ], queues: [ { name: 'order-processor', durable: true, bindings: [ { exchange: 'orders', routingKey: 'placed.#' }, ]}, ], })), 'amqp',);
// Subscribe to consume from a queue:amqp.tell({ kind: 'subscribe-queue', queue: 'order-processor', subscriber: handler });
// Publish to an exchange:amqp.tell({ kind: 'publish', exchange: 'orders', routingKey: 'placed.priority', payload: JSON.stringify(order),});Settings
Section titled “Settings”interface AmqpActorSettings extends BrokerCommonSettings { url: string; // amqp:// or amqps:// exchanges?: ExchangeDefinition[]; // declared on connect queues?: QueueDefinition[]; // declared + bound on connect prefetch?: number; // consumer prefetch count}
interface ExchangeDefinition { name: string; type: 'direct' | 'topic' | 'fanout' | 'headers'; durable?: boolean; autoDelete?: boolean;}
interface QueueDefinition { name: string; durable?: boolean; exclusive?: boolean; autoDelete?: boolean; bindings?: Array<{ exchange: string; routingKey?: string; args?: Record<string, unknown> }>;}The actor declares exchanges + queues + bindings on connect (idempotent — re-declaration of identical definitions is fine).
Exchanges and routing
Section titled “Exchanges and routing”| Type | Routing |
|---|---|
direct | Exact match on routing key. |
topic | Wildcard pattern match (* = one segment, # = many). |
fanout | All bound queues, ignoring routing key. |
headers | Match on message headers, not routing key. |
exchanges: [ { name: 'work', type: 'direct', durable: true }, // exact routing { name: 'audit', type: 'topic', durable: true }, // pattern match { name: 'notify', type: 'fanout', durable: false }, // broadcast];Most common pattern: topic exchanges with hierarchical
routing keys (orders.placed.priority, orders.cancelled,
payments.failed).
Ack / nack
Section titled “Ack / nack”class OrderHandler extends Actor<AmqpMessage> { override async onReceive(msg: AmqpMessage): Promise<void> { try { await processOrder(msg); msg.ack(); // tell the broker the message was processed } catch (err) { msg.nack({ requeue: true }); // put back on the queue } }}Every inbound message has ack() / nack(opts) methods. The
broker holds the message in unacked state until you call
one — if the consumer crashes, the broker re-delivers on the
next connection.
Use nack({ requeue: false }) to send to a dead-letter queue
(if configured) — useful when retry won’t help.
Prefetch
Section titled “Prefetch”prefetch: 10 // consume up to 10 unacked at once per consumerThe default (per AMQP spec) is unlimited — the broker would
push every message in the queue. Set prefetch to a sensible
batch size (10-50) to avoid overwhelming the consumer.
Peer dependency
Section titled “Peer dependency”npm install amqplib# or: bun add amqplibWhen to use AMQP
Section titled “When to use AMQP”Three primary scenarios:
- Work queues — multiple consumers competing for messages on a queue, one consumer wins per message.
- Routing + filtering — topic exchanges with rich routing keys.
- Bridging existing RabbitMQ infrastructure.
For high-throughput streaming, Kafka scales better. For lightweight pub/sub, NATS or MQTT is lighter. AMQP shines for enterprise-style queue + routing where features (DLQ, TTL, prefetch, ack semantics) matter.
Where to next
Section titled “Where to next”- I/O overview — the bigger picture.
- BrokerActor base — the shared lifecycle.
- Kafka — for streaming.
- NATS / MQTT — lighter alternatives.