Skip to content

Kafka

KafkaActor integrates with Apache Kafka. Wraps kafkajs internally; you don’t import it directly.

import { ActorSystem, Props, KafkaActor } from 'actor-ts';
const system = ActorSystem.create('my-app');
const kafka = system.actorOf(
Props.create(() => new KafkaActor({
brokers: ['kafka-1:9092', 'kafka-2:9092'],
clientId: 'my-app',
consumer: {
groupId: 'my-app-orders',
topics: ['orders', 'payments'],
},
})),
'kafka',
);
// Subscribe to consume:
kafka.tell({ kind: 'subscribe', subscriber: handler });
// Publish:
kafka.tell({ kind: 'publish', topic: 'audit', value: 'something' });
interface KafkaActorSettings extends BrokerCommonSettings {
brokers: string[] | string; // bootstrap servers
clientId?: string;
sasl?: {
mechanism: 'plain' | 'scram-sha-256' | 'scram-sha-512';
username: string;
password: string;
};
ssl?: boolean;
producer?: {
idempotent?: boolean;
allowAutoTopicCreation?: boolean;
};
consumer?: {
groupId?: string;
fromBeginning?: boolean;
commitMode?: 'auto' | 'manual';
commitTimeoutMs?: number;
topics?: string[];
};
}

The BrokerCommonSettings parent gives you reconnect, buffer, breaker — see BrokerActor base.

kafka.tell({
kind: 'publish',
topic: 'orders',
key: orderId, // optional — partitions by key
value: JSON.stringify(order),
headers: { 'x-trace': traceId },
});

Buffered when disconnected; flushed in order on reconnect.

For strict ordering within a partition, set producer.idempotent: true and provide consistent key for related records — kafkajs deduplicates retries within the producer session.

kafka.tell({ kind: 'subscribe', subscriber: orderHandler });
class OrderHandler extends Actor<KafkaRecord> {
override onReceive(record: KafkaRecord): void {
// record.topic / partition / offset / key / value / timestamp / headers
const order = JSON.parse(new TextDecoder().decode(record.value!));
this.process(order);
}
}

Each subscriber receives every record for the topics the actor’s consumer is subscribed to. Multiple subscribers each see every record — they don’t share work. For Kafka’s consumer-group work-sharing, use multiple actor instances with the same groupId.

consumer: { commitMode: 'auto' } // at-least-once
consumer: { commitMode: 'manual' } // exactly-once-with-processing

auto (default): kafkajs commits after the handler returns. Crash between handler and commit → re-delivery on next start. At-least-once. Handlers must be idempotent.

manual: the consumer pauses on each record and waits for an explicit commit message:

override async onReceive(record: KafkaRecord): Promise<void> {
await processIdempotently(record);
this.kafka.tell({
kind: 'commit',
topic: record.topic,
partition: record.partition,
offset: record.offset,
});
}

Gives exactly-once-with-processing semantics if your processing is itself transactional. commitTimeoutMs (default 30 s) caps how long the pump waits before giving up and triggering a rebalance.

new KafkaActor({
brokers: ['kafka-1:9093'],
ssl: true,
sasl: {
mechanism: 'scram-sha-512',
username: process.env.KAFKA_USER!,
password: process.env.KAFKA_PASS!,
},
});

Both TLS (ssl: true) and SASL credentials pass through to kafkajs. Use env vars for secrets; don’t hard-code in application.conf.

Terminal window
npm install kafkajs
# or: bun add kafkajs

kafkajs is a peer dependency — the framework doesn’t bundle it. You install when using KafkaActor.

Three primary use cases:

  1. Durable event streaming — multiple consumers reading the same stream with different offsets; replayable history.
  2. Decoupling producers and consumers — high-volume downstream that shouldn’t pace the producer.
  3. Bridging existing Kafka infrastructure — connecting actor-ts to a system already using Kafka.

For in-cluster pub/sub, prefer DistributedPubSub — no broker dependency.