Skip to content

Redis Streams

RedisStreamsActor integrates with Redis Streams (the XADD / XREAD / XREADGROUP family). Right shape when Redis is already in your infrastructure and you want streaming without deploying Kafka or NATS.

import { ActorSystem, Props, RedisStreamsActor } from 'actor-ts';
const redis = system.actorOf(
Props.create(() => new RedisStreamsActor({
url: 'redis://localhost:6379',
consumerGroup: 'order-processors',
consumerName: 'worker-1',
streams: ['orders', 'payments'],
})),
'redis',
);
// Subscribe (consumer-group reads):
redis.tell({ kind: 'subscribe-streams', subscriber: handler });
// Publish:
redis.tell({
kind: 'xadd',
stream: 'orders',
fields: { sku: 'book-1', quantity: '2' },
});
interface RedisStreamsActorSettings extends BrokerCommonSettings {
url: string;
consumerGroup?: string; // required for XREADGROUP mode
consumerName?: string; // unique within the group
streams?: string[]; // streams to consume from
blockTimeoutMs?: number; // XREAD block; default 5_000
count?: number; // batch size; default 100
password?: string;
tls?: boolean;
}
// No consumerGroup → uses XREAD
new RedisStreamsActor({ url, streams: ['events'] });

Reads from the stream’s $ (latest) position by default — every consumer sees every message after the subscribe. No ack; no work-sharing.

Good for broadcast-style distribution where every consumer should process every message.

new RedisStreamsActor({
url,
streams: ['orders'],
consumerGroup: 'workers',
consumerName: 'worker-1',
});

Consumer-group mode:

  • Work-sharing — each message goes to one consumer in the group.
  • Ack semantics — consumers XACK after processing; un-acked messages are tracked in the pending entries list (PEL) and can be re-delivered.
  • Multiple workers — N actor instances in the same group share the workload.

This is the right mode for work queues — orders go to one worker, processed, acked, done.

class OrderProcessor extends Actor<RedisStreamEntry> {
override async onReceive(entry: RedisStreamEntry): Promise<void> {
const order = JSON.parse(entry.fields.body);
await processOrder(order);
this.redis.tell({ kind: 'xack', stream: entry.stream, id: entry.id });
}
}

The entry:

interface RedisStreamEntry {
stream: string;
id: string; // e.g. "1684923847-0"
fields: Record<string, string>; // your XADD payload
}

Stream IDs are <millis>-<seq> strings. Use the id as a dedup key for idempotent processing — even if Redis re-delivers, your handler can skip already-processed IDs.

Un-acked messages stay in the consumer group’s PEL. After a consumer crash, another consumer can claim those messages:

redis.tell({
kind: 'xclaim',
stream: 'orders',
group: 'workers',
consumer: 'worker-1',
ids: ['1684923847-0'],
minIdleMs: 60_000, // only claim if idle > 60s
});

For automatic re-delivery, the framework’s broker-actor base class handles common cases — see the API reference for the manual xclaim flow.

Terminal window
npm install redis
# or: bun add redis

Uses the official redis (node-redis) v4+ client.

Three good fits:

  1. Existing Redis infrastructure — adding streaming without deploying a new broker.
  2. Sub-millisecond latency requirements — Redis is fast.
  3. Smaller workloads — millions per day, not billions. For huge scale, Kafka is purpose-built.

Not the right shape for:

  • Long-term retention — Redis Streams can persist but aren’t designed for years-long retention.
  • Massive parallelism — Redis is single-threaded; one Redis server bottlenecks.