Kafka
KafkaActor integriert mit Apache Kafka. Umschließt intern
kafkajs; du importierst es nicht direkt.
import { ActorSystem, Props, KafkaActor } from 'actor-ts';
const system = ActorSystem.create('my-app');
const kafka = system.spawn( Props.create(() => new KafkaActor({ brokers: ['kafka-1:9092', 'kafka-2:9092'], clientId: 'my-app', // Consumer-Actor-Target wird einmalig hier konfiguriert. consumer: { groupId: 'my-app-orders', topics: ['orders', 'payments'], target: orderHandler, }, })), 'kafka',);
// Ein weiteres Topic zur Laufzeit hinzufügen (das `topics` aus dem// Konstruktor deckt den Normalfall ab; `subscribe` ist für späte// Ergänzungen).kafka.tell({ kind: 'subscribe', topic: 'audit' });
// Publishen:kafka.tell({ kind: 'publish', publish: { topic: 'audit', value: 'something' } });Settings
Abschnitt betitelt „Settings“interface KafkaActorSettings extends BrokerCommonSettings { brokers: string[] | string; // Bootstrap-Server clientId?: string; sasl?: { mechanism: 'plain' | 'scram-sha-256' | 'scram-sha-512'; username: string; password: string; }; ssl?: boolean; producer?: { idempotent?: boolean; allowAutoTopicCreation?: boolean; }; consumer?: { groupId?: string; fromBeginning?: boolean; commitMode?: 'auto' | 'manual'; commitTimeoutMs?: number; topics?: ReadonlyArray<string>; target?: ActorRef<KafkaRecord>; // Consumer-Actor für eingehende Records };}Der Parent BrokerCommonSettings gibt dir Reconnect, Buffer,
Breaker — siehe BrokerActor-Basis.
Producen
Abschnitt betitelt „Producen“kafka.tell({ kind: 'publish', publish: { topic: 'orders', key: orderId, // optional — partitioniert nach Key value: JSON.stringify(order), headers: { 'x-trace': traceId }, },});Bei getrennter Verbindung gepuffert; beim Reconnect in Reihenfolge geflusht.
Für strikte Reihenfolge innerhalb einer Partition setze
producer.idempotent: true und gib einen konsistenten key für
zusammengehörige Records — kafkajs dedupliziert Retries innerhalb
der Producer-Session.
Consumen
Abschnitt betitelt „Consumen“// Das Consumer-Actor-Target wird über die Settings verdrahtet// (siehe oben); `subscribe` ergänzt zur Laufzeit nur weitere// Topics, wenn nötig.kafka.tell({ kind: 'subscribe', topic: 'orders' });
class OrderHandler extends Actor<KafkaRecord> { override onReceive(record: KafkaRecord): void { // record.topic / partition / offset / key / value / timestamp / headers const order = JSON.parse(new TextDecoder().decode(record.value!)); this.process(order); }}Der Actor liefert jeden konsumierten Record an consumer.target.
Für Kafkas Consumer-Group-Work-Sharing nimm mehrere
Actor-Instanzen mit derselben groupId — jede Instanz
übernimmt eine Teilmenge der Partitionen.
Commit-Modi
Abschnitt betitelt „Commit-Modi“consumer: { commitMode: 'auto' } // at-least-onceconsumer: { commitMode: 'manual' } // exactly-once-with-processingauto (Default): kafkajs committed, nachdem der Handler
zurückkehrt. Crash zwischen Handler und Commit → erneute
Auslieferung beim nächsten Start. At-least-once. Handler müssen
idempotent sein.
manual: der Consumer pausiert bei jedem Record und wartet
auf eine explizite Commit-Nachricht:
override async onReceive(record: KafkaRecord): Promise<void> { await processIdempotently(record); this.kafka.tell({ kind: 'commit', topic: record.topic, partition: record.partition, offset: record.offset, });}Liefert Exactly-once-with-processing-Semantik, wenn deine
Verarbeitung selbst transaktional ist. commitTimeoutMs
(Default 30 s) deckelt, wie lange die Pumpe wartet, bevor sie
aufgibt und einen Rebalance auslöst.
Auth + TLS
Abschnitt betitelt „Auth + TLS“new KafkaActor({ brokers: ['kafka-1:9093'], ssl: true, sasl: { mechanism: 'scram-sha-512', username: process.env.KAFKA_USER!, password: process.env.KAFKA_PASS!, },});Sowohl TLS (ssl: true) als auch SASL-Credentials werden an
kafkajs durchgereicht. Nimm Env-Variablen für Secrets;
hardcode sie nicht in application.conf.
Peer-Dependency
Abschnitt betitelt „Peer-Dependency“npm install kafkajs# oder: bun add kafkajskafkajs ist eine Peer-Dependency — das Framework bundelt es
nicht. Du installierst es, wenn du KafkaActor nutzt.
Wann Kafka
Abschnitt betitelt „Wann Kafka“Drei primäre Anwendungsfälle:
- Durables Event-Streaming — mehrere Consumer lesen denselben Stream mit verschiedenen Offsets; wiederabspielbare Historie.
- Producer und Consumer entkoppeln — High-Volume-Downstreams, die nicht das Tempo des Producers vorgeben sollten.
- Brücken zu bestehender Kafka-Infrastruktur — actor-ts an ein bereits Kafka nutzendes System anbinden.
Für clusterinternes Pub/Sub bevorzuge DistributedPubSub — keine Broker- Abhängigkeit.
Wohin als Nächstes
Abschnitt betitelt „Wohin als Nächstes“- I/O-Übersicht — das große Bild.
- BrokerActor-Basis — der gemeinsame Lifecycle.
- DistributedPubSub — clusterinterne Alternative, wenn du kein Kafka brauchst.
- NATS / MQTT — alternative Broker.