AMQP (RabbitMQ)
AmqpActor integriert mit RabbitMQ (und anderen AMQP-0.9.1-
Brokern). Unterstützt Exchanges (direct / topic / fanout /
headers), Queues, Routing-Keys, ACK/NACK-Semantik.
import { ActorSystem, Props, AmqpActor } from 'actor-ts';
const amqp = system.spawn( Props.create(() => new AmqpActor({ url: 'amqp://guest:guest@rabbitmq:5672', // Queue → Consumer-Actor-Verdrahtung läuft über `bindings` in // den Settings — einmal beim Connect deklariert. Ein // Runtime-Subscribe/Unsubscribe ist kein v1-Surface; wenn Du // nach dem Start einen neuen Consumer anhängen musst, starte // den Actor mit aktualisierten Bindings neu. bindings: [ { queue: 'order-processor', exchange: 'orders', routingKey: 'placed.#', target: handler }, ], })), 'amqp',);
// An einen Exchange publishen:amqp.tell({ kind: 'publish', publish: { exchange: 'orders', routingKey: 'placed.priority', content: JSON.stringify(order), },});Settings
Abschnitt betitelt „Settings“interface AmqpActorSettings extends BrokerCommonSettings { url: string; // amqp:// oder amqps:// prefetch?: number; // ungeackte Nachrichten pro Consumer; Default 1 autoAck?: boolean; // ACK bei Auslieferung vs. nach Verarbeitung; Default true bindings?: ReadonlyArray<AmqpQueueBinding>; // Queue ↔ Exchange ↔ Routing-Key ↔ Target}
interface AmqpQueueBinding { queue: string; exchange?: string; // weglassen für den Default-Exchange routingKey?: string; target: ActorRef<AmqpDelivery>; // Consumer-Actor für diese Queue}Bindings deklarieren die Queue, das Exchange-Binding UND den Consumer-Actor-Target auf einen Schlag — der Actor verdrahtet alles beim Connect (idempotente erneute Deklaration).
Exchanges und Routing
Abschnitt betitelt „Exchanges und Routing“| Typ | Routing |
|---|---|
direct | Exakter Match auf den Routing-Key. |
topic | Wildcard-Pattern-Match (* = ein Segment, # = viele). |
fanout | Alle gebundenen Queues, Routing-Key ignoriert. |
headers | Match auf Nachrichten-Header, nicht Routing-Key. |
exchanges: [ { name: 'work', type: 'direct', durable: true }, // exaktes Routing { name: 'audit', type: 'topic', durable: true }, // Pattern-Match { name: 'notify', type: 'fanout', durable: false }, // Broadcast];Häufigstes Muster: topic-Exchanges mit hierarchischen
Routing-Keys (orders.placed.priority, orders.cancelled,
payments.failed).
ACK / NACK
Abschnitt betitelt „ACK / NACK“Setze autoAck: false in den Settings; dann teilt der Handler das
Ergebnis per { kind: 'ack', delivery } oder { kind: 'nack', delivery, requeue } an die Ref des AMQP-Actors mit.
class OrderHandler extends Actor<AmqpDelivery> { constructor(private readonly amqp: ActorRef<AmqpCmd>) { super(); }
override async onReceive(delivery: AmqpDelivery): Promise<void> { try { await processOrder(delivery); this.amqp.tell({ kind: 'ack', delivery }); } catch (err) { this.amqp.tell({ kind: 'nack', delivery, requeue: true }); } }}Jede eingehende AmqpDelivery trägt einen opaken ackToken, mit
dem der Actor die Broker-seitige Nachricht identifiziert — Du
fasst ihn nicht direkt an, sondern reichst das ganze delivery-
Objekt im ack/nack-Tell wieder zurück.
Der Broker hält die Nachricht im Unacked-Zustand, bis der
Handler antwortet — wenn der Consumer crasht, liefert der Broker
bei der nächsten Verbindung erneut aus. Nutze requeue: false,
um an eine Dead-Letter-Queue zu senden (falls konfiguriert) —
nützlich, wenn ein Retry nicht hilft.
Prefetch
Abschnitt betitelt „Prefetch“prefetch: 10 // bis zu 10 ungeackte gleichzeitig pro Consumer konsumierenDer Default (laut AMQP-Spec) ist unbegrenzt — der Broker würde
jede Nachricht in der Queue pushen. Setze prefetch auf eine
sinnvolle Batch-Größe (10–50), damit der Consumer nicht
überfordert wird.
Peer-Dependency
Abschnitt betitelt „Peer-Dependency“npm install amqplib# oder: bun add amqplibWann AMQP
Abschnitt betitelt „Wann AMQP“Drei primäre Szenarien:
- Work-Queues — mehrere Consumer konkurrieren um Nachrichten auf einer Queue, pro Nachricht gewinnt ein Consumer.
- Routing + Filterung — Topic-Exchanges mit reichen Routing-Keys.
- Brücken zu bestehender RabbitMQ-Infrastruktur.
Für hochvolumiges Streaming skaliert Kafka besser. Für leichtgewichtiges Pub/Sub sind NATS oder MQTT schlanker. AMQP glänzt bei Enterprise-Style Queue + Routing, wo Features (DLQ, TTL, Prefetch, ACK-Semantik) zählen.
Wohin als Nächstes
Abschnitt betitelt „Wohin als Nächstes“- I/O-Übersicht — das große Bild.
- BrokerActor-Basis — der gemeinsame Lifecycle.
- Kafka — fürs Streaming.
- NATS / MQTT — leichtere Alternativen.