Zum Inhalt springen
Deutsch

Redis Streams

RedisStreamsActor integriert mit Redis Streams (der Familie XADD / XREAD / XREADGROUP). Die richtige Form, wenn Redis bereits in deiner Infrastruktur steckt und du Streaming willst, ohne Kafka oder NATS aufzusetzen.

import { ActorSystem, Props, RedisStreamsActor } from 'actor-ts';
const redis = system.spawn(
Props.create(() => new RedisStreamsActor({
url: 'redis://localhost:6379',
streams: ['orders', 'payments'],
// Consumer-Group-Verdrahtung + Consumer-Actor-Target leben in
// den Settings — es gibt kein Runtime-`subscribe`-Surface.
consumerGroup: { group: 'order-processors', consumer: 'worker-1' },
target: orderHandler,
})),
'redis',
);
// Publishen (XADD unter der Haube):
redis.tell({
kind: 'publish',
publish: {
stream: 'orders',
fields: { sku: 'book-1', quantity: '2' },
},
});
interface RedisStreamsActorSettings extends BrokerCommonSettings {
url: string; // 'redis://host:6379'
streams?: ReadonlyArray<string>; // Streams, die konsumiert werden
consumerGroup?: {
group: string;
consumer: string;
createIfMissing?: boolean; // Default true
};
blockMs?: number; // XREADGROUP-Block; Default 5_000
target?: ActorRef<RedisStreamEntry>; // Consumer-Actor für eingehende Einträge
}
// Keine consumerGroup → nutzt XREAD
new RedisStreamsActor({ url, streams: ['events'], target: handler });

Liest standardmäßig ab der $ (latest)-Position des Streams — jeder Consumer sieht jede Nachricht nach dem Subscribe. Kein ACK; kein Work-Sharing.

Gut für Broadcast-artige Verteilung, bei der jeder Consumer jede Nachricht verarbeiten soll.

new RedisStreamsActor({
url,
streams: ['orders'],
consumerGroup: { group: 'workers', consumer: 'worker-1' },
target: handler,
});

Consumer-Group-Modus:

  • Work-Sharing — jede Nachricht geht an einen Consumer in der Gruppe.
  • ACK-Semantik — Consumer rufen nach der Verarbeitung XACK auf; ungeackte Nachrichten werden in der Pending Entries List (PEL) geführt und können erneut zugestellt werden.
  • Mehrere Worker — N Actor-Instanzen in derselben Gruppe teilen sich die Last.

Das ist der richtige Modus für Work Queues — Orders gehen an einen Worker, werden verarbeitet, geackt, fertig.

class OrderProcessor extends Actor<RedisStreamEntry> {
constructor(private readonly redis: ActorRef<RedisStreamsCmd>) { super(); }
override async onReceive(entry: RedisStreamEntry): Promise<void> {
const order = JSON.parse(entry.fields.body);
await processOrder(order);
this.redis.tell({ kind: 'ack', stream: entry.stream, id: entry.id });
}
}

Der entry:

interface RedisStreamEntry {
stream: string;
id: string; // z. B. "1684923847-0"
fields: Record<string, string>; // dein XADD-Payload
}

Stream-IDs sind <millis>-<seq>-Strings. Nimm die id als Dedup-Key für idempotente Verarbeitung — selbst wenn Redis erneut zustellt, kann dein Handler schon verarbeitete IDs überspringen.

Ungeackte Nachrichten bleiben in der PEL der Consumer Group — Redis trackt jeden ausgelieferten-aber-nicht-geackten Eintrag pro Consumer. Nach einem Consumer-Crash können diese Einträge manuell per XCLAIM gegen den Redis-Server re-claimed werden (außerhalb des Actor-Surface). Ein zukünftiges Actor-Command würde das einkapseln; aktuell führe XCLAIM aus einem separaten ioredis-Client oder Sidecar-Prozess aus.

Für At-Least-Once-Delivery im Normalfall: ack nach der Verarbeitung tellen — ein Consumer, der mitten in der Verarbeitung crasht, lässt den Eintrag einfach in der PEL stehen. Der nächste Consumer, der mit derselben Group startet, kann dort weitermachen, wo der vorige aufgehört hat (via XPENDING + XCLAIM).

Terminal-Fenster
npm install redis
# oder: bun add redis

Nutzt den offiziellen redis- (node-redis) v4+-Client.

Drei gute Einsatzfälle:

  1. Bestehende Redis-Infrastruktur — Streaming hinzufügen, ohne einen neuen Broker auszurollen.
  2. Anforderungen an Sub-Millisekunden-Latenz — Redis ist schnell.
  3. Kleinere Workloads — Millionen pro Tag, nicht Milliarden. Für riesige Skalierung ist Kafka zweckgebaut.

Nicht die richtige Form für:

  • Langzeit-Retention — Redis Streams können persistieren, sind aber nicht für jahrelange Retention ausgelegt.
  • Massive Parallelität — Redis ist single-threaded; ein Redis-Server wird zum Bottleneck.