コンテンツにスキップ
日本語

Redis Streams

このコンテンツはまだ日本語訳がありません。

RedisStreamsActor integrates with Redis Streams (the XADD / XREAD / XREADGROUP family). Right shape when Redis is already in your infrastructure and you want streaming without deploying Kafka or NATS.

import { ActorSystem, Props, RedisStreamsActor } from 'actor-ts';
const redis = system.spawn(
Props.create(() => new RedisStreamsActor({
url: 'redis://localhost:6379',
streams: ['orders', 'payments'],
// Consumer-group wiring + consumer-actor target live in
// settings — there is no runtime `subscribe` surface.
consumerGroup: { group: 'order-processors', consumer: 'worker-1' },
target: orderHandler,
})),
'redis',
);
// Publish (XADD under the hood):
redis.tell({
kind: 'publish',
publish: {
stream: 'orders',
fields: { sku: 'book-1', quantity: '2' },
},
});
interface RedisStreamsActorSettings extends BrokerCommonSettings {
url: string; // 'redis://host:6379'
streams?: ReadonlyArray<string>; // streams to consume
consumerGroup?: {
group: string;
consumer: string;
createIfMissing?: boolean; // default true
};
blockMs?: number; // XREADGROUP block; default 5_000
target?: ActorRef<RedisStreamEntry>; // consumer-actor for inbound entries
}
// No consumerGroup → uses XREAD
new RedisStreamsActor({ url, streams: ['events'], target: handler });

Reads from the stream’s $ (latest) position by default — every consumer sees every message after the subscribe. No ack; no work-sharing.

Good for broadcast-style distribution where every consumer should process every message.

new RedisStreamsActor({
url,
streams: ['orders'],
consumerGroup: { group: 'workers', consumer: 'worker-1' },
target: handler,
});

Consumer-group mode:

  • Work-sharing — each message goes to one consumer in the group.
  • Ack semantics — consumers XACK after processing; un-acked messages are tracked in the pending entries list (PEL) and can be re-delivered.
  • Multiple workers — N actor instances in the same group share the workload.

This is the right mode for work queues — orders go to one worker, processed, acked, done.

class OrderProcessor extends Actor<RedisStreamEntry> {
constructor(private readonly redis: ActorRef<RedisStreamsCmd>) { super(); }
override async onReceive(entry: RedisStreamEntry): Promise<void> {
const order = JSON.parse(entry.fields.body);
await processOrder(order);
this.redis.tell({ kind: 'ack', stream: entry.stream, id: entry.id });
}
}

The entry:

interface RedisStreamEntry {
stream: string;
id: string; // e.g. "1684923847-0"
fields: Record<string, string>; // your XADD payload
}

Stream IDs are <millis>-<seq> strings. Use the id as a dedup key for idempotent processing — even if Redis re-delivers, your handler can skip already-processed IDs.

Un-acked messages stay in the consumer group’s PEL — Redis tracks every delivered-but-not-acked entry per consumer. After a consumer crash, those entries can be manually re-claimed via XCLAIM against the Redis server (outside the actor surface). A future actor command would wrap that; for now, run XCLAIM from a separate ioredis client or use a sidecar process.

For at-least-once delivery in the common path: tell ack after processing, and a consumer that crashes mid-processing simply leaves the entry in the PEL — the next consumer that starts with the same group can pick up where the previous one stopped (via XPENDING + XCLAIM).

Terminal window
npm install redis
# or: bun add redis

Uses the official redis (node-redis) v4+ client.

Three good fits:

  1. Existing Redis infrastructure — adding streaming without deploying a new broker.
  2. Sub-millisecond latency requirements — Redis is fast.
  3. Smaller workloads — millions per day, not billions. For huge scale, Kafka is purpose-built.

Not the right shape for:

  • Long-term retention — Redis Streams can persist but aren’t designed for years-long retention.
  • Massive parallelism — Redis is single-threaded; one Redis server bottlenecks.