Zum Inhalt springen
Deutsch

AMQP (RabbitMQ)

AmqpActor integriert mit RabbitMQ (und anderen AMQP-0.9.1- Brokern). Unterstützt Exchanges (direct / topic / fanout / headers), Queues, Routing-Keys, ACK/NACK-Semantik.

import { ActorSystem, Props, AmqpActor } from 'actor-ts';
const amqp = system.spawn(
Props.create(() => new AmqpActor({
url: 'amqp://guest:guest@rabbitmq:5672',
// Queue → Consumer-Actor-Verdrahtung läuft über `bindings` in
// den Settings — einmal beim Connect deklariert. Ein
// Runtime-Subscribe/Unsubscribe ist kein v1-Surface; wenn Du
// nach dem Start einen neuen Consumer anhängen musst, starte
// den Actor mit aktualisierten Bindings neu.
bindings: [
{ queue: 'order-processor', exchange: 'orders', routingKey: 'placed.#', target: handler },
],
})),
'amqp',
);
// An einen Exchange publishen:
amqp.tell({
kind: 'publish',
publish: {
exchange: 'orders',
routingKey: 'placed.priority',
content: JSON.stringify(order),
},
});
interface AmqpActorSettings extends BrokerCommonSettings {
url: string; // amqp:// oder amqps://
prefetch?: number; // ungeackte Nachrichten pro Consumer; Default 1
autoAck?: boolean; // ACK bei Auslieferung vs. nach Verarbeitung; Default true
bindings?: ReadonlyArray<AmqpQueueBinding>; // Queue ↔ Exchange ↔ Routing-Key ↔ Target
}
interface AmqpQueueBinding {
queue: string;
exchange?: string; // weglassen für den Default-Exchange
routingKey?: string;
target: ActorRef<AmqpDelivery>; // Consumer-Actor für diese Queue
}

Bindings deklarieren die Queue, das Exchange-Binding UND den Consumer-Actor-Target auf einen Schlag — der Actor verdrahtet alles beim Connect (idempotente erneute Deklaration).

TypRouting
directExakter Match auf den Routing-Key.
topicWildcard-Pattern-Match (* = ein Segment, # = viele).
fanoutAlle gebundenen Queues, Routing-Key ignoriert.
headersMatch auf Nachrichten-Header, nicht Routing-Key.
exchanges: [
{ name: 'work', type: 'direct', durable: true }, // exaktes Routing
{ name: 'audit', type: 'topic', durable: true }, // Pattern-Match
{ name: 'notify', type: 'fanout', durable: false }, // Broadcast
];

Häufigstes Muster: topic-Exchanges mit hierarchischen Routing-Keys (orders.placed.priority, orders.cancelled, payments.failed).

Setze autoAck: false in den Settings; dann teilt der Handler das Ergebnis per { kind: 'ack', delivery } oder { kind: 'nack', delivery, requeue } an die Ref des AMQP-Actors mit.

class OrderHandler extends Actor<AmqpDelivery> {
constructor(private readonly amqp: ActorRef<AmqpCmd>) { super(); }
override async onReceive(delivery: AmqpDelivery): Promise<void> {
try {
await processOrder(delivery);
this.amqp.tell({ kind: 'ack', delivery });
} catch (err) {
this.amqp.tell({ kind: 'nack', delivery, requeue: true });
}
}
}

Jede eingehende AmqpDelivery trägt einen opaken ackToken, mit dem der Actor die Broker-seitige Nachricht identifiziert — Du fasst ihn nicht direkt an, sondern reichst das ganze delivery- Objekt im ack/nack-Tell wieder zurück.

Der Broker hält die Nachricht im Unacked-Zustand, bis der Handler antwortet — wenn der Consumer crasht, liefert der Broker bei der nächsten Verbindung erneut aus. Nutze requeue: false, um an eine Dead-Letter-Queue zu senden (falls konfiguriert) — nützlich, wenn ein Retry nicht hilft.

prefetch: 10 // bis zu 10 ungeackte gleichzeitig pro Consumer konsumieren

Der Default (laut AMQP-Spec) ist unbegrenzt — der Broker würde jede Nachricht in der Queue pushen. Setze prefetch auf eine sinnvolle Batch-Größe (10–50), damit der Consumer nicht überfordert wird.

Terminal-Fenster
npm install amqplib
# oder: bun add amqplib

Drei primäre Szenarien:

  1. Work-Queues — mehrere Consumer konkurrieren um Nachrichten auf einer Queue, pro Nachricht gewinnt ein Consumer.
  2. Routing + Filterung — Topic-Exchanges mit reichen Routing-Keys.
  3. Brücken zu bestehender RabbitMQ-Infrastruktur.

Für hochvolumiges Streaming skaliert Kafka besser. Für leichtgewichtiges Pub/Sub sind NATS oder MQTT schlanker. AMQP glänzt bei Enterprise-Style Queue + Routing, wo Features (DLQ, TTL, Prefetch, ACK-Semantik) zählen.