Skip to content

AMQP (RabbitMQ)

AmqpActor integrates with RabbitMQ (and other AMQP 0.9.1 brokers). Supports exchanges (direct / topic / fanout / headers), queues, routing keys, ack/nack semantics.

import { ActorSystem, Props, AmqpActor } from 'actor-ts';
const amqp = system.actorOf(
Props.create(() => new AmqpActor({
url: 'amqp://guest:guest@rabbitmq:5672',
exchanges: [
{ name: 'orders', type: 'topic', durable: true },
],
queues: [
{ name: 'order-processor', durable: true, bindings: [
{ exchange: 'orders', routingKey: 'placed.#' },
]},
],
})),
'amqp',
);
// Subscribe to consume from a queue:
amqp.tell({ kind: 'subscribe-queue', queue: 'order-processor', subscriber: handler });
// Publish to an exchange:
amqp.tell({
kind: 'publish',
exchange: 'orders',
routingKey: 'placed.priority',
payload: JSON.stringify(order),
});
interface AmqpActorSettings extends BrokerCommonSettings {
url: string; // amqp:// or amqps://
exchanges?: ExchangeDefinition[]; // declared on connect
queues?: QueueDefinition[]; // declared + bound on connect
prefetch?: number; // consumer prefetch count
}
interface ExchangeDefinition {
name: string;
type: 'direct' | 'topic' | 'fanout' | 'headers';
durable?: boolean;
autoDelete?: boolean;
}
interface QueueDefinition {
name: string;
durable?: boolean;
exclusive?: boolean;
autoDelete?: boolean;
bindings?: Array<{ exchange: string; routingKey?: string; args?: Record<string, unknown> }>;
}

The actor declares exchanges + queues + bindings on connect (idempotent — re-declaration of identical definitions is fine).

TypeRouting
directExact match on routing key.
topicWildcard pattern match (* = one segment, # = many).
fanoutAll bound queues, ignoring routing key.
headersMatch on message headers, not routing key.
exchanges: [
{ name: 'work', type: 'direct', durable: true }, // exact routing
{ name: 'audit', type: 'topic', durable: true }, // pattern match
{ name: 'notify', type: 'fanout', durable: false }, // broadcast
];

Most common pattern: topic exchanges with hierarchical routing keys (orders.placed.priority, orders.cancelled, payments.failed).

class OrderHandler extends Actor<AmqpMessage> {
override async onReceive(msg: AmqpMessage): Promise<void> {
try {
await processOrder(msg);
msg.ack(); // tell the broker the message was processed
} catch (err) {
msg.nack({ requeue: true }); // put back on the queue
}
}
}

Every inbound message has ack() / nack(opts) methods. The broker holds the message in unacked state until you call one — if the consumer crashes, the broker re-delivers on the next connection.

Use nack({ requeue: false }) to send to a dead-letter queue (if configured) — useful when retry won’t help.

prefetch: 10 // consume up to 10 unacked at once per consumer

The default (per AMQP spec) is unlimited — the broker would push every message in the queue. Set prefetch to a sensible batch size (10-50) to avoid overwhelming the consumer.

Terminal window
npm install amqplib
# or: bun add amqplib

Three primary scenarios:

  1. Work queues — multiple consumers competing for messages on a queue, one consumer wins per message.
  2. Routing + filtering — topic exchanges with rich routing keys.
  3. Bridging existing RabbitMQ infrastructure.

For high-throughput streaming, Kafka scales better. For lightweight pub/sub, NATS or MQTT is lighter. AMQP shines for enterprise-style queue + routing where features (DLQ, TTL, prefetch, ack semantics) matter.