Zum Inhalt springen
Deutsch

Kafka

KafkaActor integriert mit Apache Kafka. Umschließt intern kafkajs; du importierst es nicht direkt.

import { ActorSystem, Props, KafkaActor } from 'actor-ts';
const system = ActorSystem.create('my-app');
const kafka = system.spawn(
Props.create(() => new KafkaActor({
brokers: ['kafka-1:9092', 'kafka-2:9092'],
clientId: 'my-app',
// Consumer-Actor-Target wird einmalig hier konfiguriert.
consumer: {
groupId: 'my-app-orders',
topics: ['orders', 'payments'],
target: orderHandler,
},
})),
'kafka',
);
// Ein weiteres Topic zur Laufzeit hinzufügen (das `topics` aus dem
// Konstruktor deckt den Normalfall ab; `subscribe` ist für späte
// Ergänzungen).
kafka.tell({ kind: 'subscribe', topic: 'audit' });
// Publishen:
kafka.tell({ kind: 'publish', publish: { topic: 'audit', value: 'something' } });
interface KafkaActorSettings extends BrokerCommonSettings {
brokers: string[] | string; // Bootstrap-Server
clientId?: string;
sasl?: {
mechanism: 'plain' | 'scram-sha-256' | 'scram-sha-512';
username: string;
password: string;
};
ssl?: boolean;
producer?: {
idempotent?: boolean;
allowAutoTopicCreation?: boolean;
};
consumer?: {
groupId?: string;
fromBeginning?: boolean;
commitMode?: 'auto' | 'manual';
commitTimeoutMs?: number;
topics?: ReadonlyArray<string>;
target?: ActorRef<KafkaRecord>; // Consumer-Actor für eingehende Records
};
}

Der Parent BrokerCommonSettings gibt dir Reconnect, Buffer, Breaker — siehe BrokerActor-Basis.

kafka.tell({
kind: 'publish',
publish: {
topic: 'orders',
key: orderId, // optional — partitioniert nach Key
value: JSON.stringify(order),
headers: { 'x-trace': traceId },
},
});

Bei getrennter Verbindung gepuffert; beim Reconnect in Reihenfolge geflusht.

Für strikte Reihenfolge innerhalb einer Partition setze producer.idempotent: true und gib einen konsistenten key für zusammengehörige Records — kafkajs dedupliziert Retries innerhalb der Producer-Session.

// Das Consumer-Actor-Target wird über die Settings verdrahtet
// (siehe oben); `subscribe` ergänzt zur Laufzeit nur weitere
// Topics, wenn nötig.
kafka.tell({ kind: 'subscribe', topic: 'orders' });
class OrderHandler extends Actor<KafkaRecord> {
override onReceive(record: KafkaRecord): void {
// record.topic / partition / offset / key / value / timestamp / headers
const order = JSON.parse(new TextDecoder().decode(record.value!));
this.process(order);
}
}

Der Actor liefert jeden konsumierten Record an consumer.target. Für Kafkas Consumer-Group-Work-Sharing nimm mehrere Actor-Instanzen mit derselben groupId — jede Instanz übernimmt eine Teilmenge der Partitionen.

consumer: { commitMode: 'auto' } // at-least-once
consumer: { commitMode: 'manual' } // exactly-once-with-processing

auto (Default): kafkajs committed, nachdem der Handler zurückkehrt. Crash zwischen Handler und Commit → erneute Auslieferung beim nächsten Start. At-least-once. Handler müssen idempotent sein.

manual: der Consumer pausiert bei jedem Record und wartet auf eine explizite Commit-Nachricht:

override async onReceive(record: KafkaRecord): Promise<void> {
await processIdempotently(record);
this.kafka.tell({
kind: 'commit',
topic: record.topic,
partition: record.partition,
offset: record.offset,
});
}

Liefert Exactly-once-with-processing-Semantik, wenn deine Verarbeitung selbst transaktional ist. commitTimeoutMs (Default 30 s) deckelt, wie lange die Pumpe wartet, bevor sie aufgibt und einen Rebalance auslöst.

new KafkaActor({
brokers: ['kafka-1:9093'],
ssl: true,
sasl: {
mechanism: 'scram-sha-512',
username: process.env.KAFKA_USER!,
password: process.env.KAFKA_PASS!,
},
});

Sowohl TLS (ssl: true) als auch SASL-Credentials werden an kafkajs durchgereicht. Nimm Env-Variablen für Secrets; hardcode sie nicht in application.conf.

Terminal-Fenster
npm install kafkajs
# oder: bun add kafkajs

kafkajs ist eine Peer-Dependency — das Framework bundelt es nicht. Du installierst es, wenn du KafkaActor nutzt.

Drei primäre Anwendungsfälle:

  1. Durables Event-Streaming — mehrere Consumer lesen denselben Stream mit verschiedenen Offsets; wiederabspielbare Historie.
  2. Producer und Consumer entkoppeln — High-Volume-Downstreams, die nicht das Tempo des Producers vorgeben sollten.
  3. Brücken zu bestehender Kafka-Infrastruktur — actor-ts an ein bereits Kafka nutzendes System anbinden.

Für clusterinternes Pub/Sub bevorzuge DistributedPubSub — keine Broker- Abhängigkeit.