跳转到内容
简体中文

AMQP (RabbitMQ)

此内容尚不支持你的语言。

AmqpActor integrates with RabbitMQ (and other AMQP 0.9.1 brokers). Supports exchanges (direct / topic / fanout / headers), queues, routing keys, ack/nack semantics.

import { ActorSystem, Props, AmqpActor } from 'actor-ts';
const amqp = system.spawn(
Props.create(() => new AmqpActor({
url: 'amqp://guest:guest@rabbitmq:5672',
// Queue → consumer-actor wiring lives in `bindings` on the
// settings — declared once when the actor connects. Runtime
// subscribe / unsubscribe isn't a v1 surface; if you need to
// attach a new consumer after start, restart the actor with
// updated bindings.
bindings: [
{ queue: 'order-processor', exchange: 'orders', routingKey: 'placed.#', target: handler },
],
})),
'amqp',
);
// Publish to an exchange:
amqp.tell({
kind: 'publish',
publish: {
exchange: 'orders',
routingKey: 'placed.priority',
content: JSON.stringify(order),
},
});
interface AmqpActorSettings extends BrokerCommonSettings {
url: string; // amqp:// or amqps://
prefetch?: number; // unacked messages per consumer; default 1
autoAck?: boolean; // ack on delivery vs after processing; default true
bindings?: ReadonlyArray<AmqpQueueBinding>; // queue ↔ exchange ↔ routingKey ↔ target
}
interface AmqpQueueBinding {
queue: string;
exchange?: string; // omit for the default exchange
routingKey?: string;
target: ActorRef<AmqpDelivery>; // consumer-actor for this queue
}

Bindings declare the queue, the exchange binding, AND the consumer-actor target in one go — the actor wires everything up on connect (idempotent re-declaration).

TypeRouting
directExact match on routing key.
topicWildcard pattern match (* = one segment, # = many).
fanoutAll bound queues, ignoring routing key.
headersMatch on message headers, not routing key.
exchanges: [
{ name: 'work', type: 'direct', durable: true }, // exact routing
{ name: 'audit', type: 'topic', durable: true }, // pattern match
{ name: 'notify', type: 'fanout', durable: false }, // broadcast
];

Most common pattern: topic exchanges with hierarchical routing keys (orders.placed.priority, orders.cancelled, payments.failed).

Set autoAck: false in settings, then the handler tells the result back via { kind: 'ack', delivery } or { kind: 'nack', delivery, requeue } on the AMQP actor’s ref.

class OrderHandler extends Actor<AmqpDelivery> {
constructor(private readonly amqp: ActorRef<AmqpCmd>) { super(); }
override async onReceive(delivery: AmqpDelivery): Promise<void> {
try {
await processOrder(delivery);
this.amqp.tell({ kind: 'ack', delivery });
} catch (err) {
this.amqp.tell({ kind: 'nack', delivery, requeue: true });
}
}
}

Every inbound AmqpDelivery carries an opaque ackToken the actor uses to identify the broker-side message — you don’t touch it directly, just pass the whole delivery back in the ack / nack tell.

The broker holds the message in unacked state until your handler responds — if the consumer crashes, the broker re-delivers on the next connection. Use requeue: false to send to a dead-letter queue (if configured) — useful when retry won’t help.

prefetch: 10 // consume up to 10 unacked at once per consumer

The default (per AMQP spec) is unlimited — the broker would push every message in the queue. Set prefetch to a sensible batch size (10-50) to avoid overwhelming the consumer.

Terminal window
npm install amqplib
# or: bun add amqplib

Three primary scenarios:

  1. Work queues — multiple consumers competing for messages on a queue, one consumer wins per message.
  2. Routing + filtering — topic exchanges with rich routing keys.
  3. Bridging existing RabbitMQ infrastructure.

For high-throughput streaming, Kafka scales better. For lightweight pub/sub, NATS or MQTT is lighter. AMQP shines for enterprise-style queue + routing where features (DLQ, TTL, prefetch, ack semantics) matter.