Skip to content

KafkaActor

Defined in: src/io/broker/KafkaActor.ts:166

Kafka producer + consumer in one actor, backed by kafkajs. When consumer.groupId is set, a consumer is started after connectImpl and consumed records are delivered to target. When a producer is the only goal, leave consumer and topics empty.

Offset-commit semantics.

  • commitMode: 'auto' (default) — kafkajs commits after each handler returns successfully → at-least-once. Cheap; OK for idempotent handlers.
  • commitMode: 'manual' — pump pauses on each message and waits for an explicit commit command from the handler (#2). The handler is responsible for sending exactly one commit (or nack) per delivered record. If neither arrives within commitTimeoutMs the pump rejects internally, kafkajs treats the partition as failed, and re-delivery happens on rebalance. Produces exactly-once-with-processing: a message that successfully passed through commit is committed; a crash or nack re-delivers.

const kafka = system.actorOf(Props.create(() => new KafkaActor({ brokers: [‘kafka:9092’], consumer: { groupId: ‘orders’, commitMode: ‘manual’ }, topics: [‘orders’], target: orderProcessor, })));

class OrderProcessor extends Actor { constructor(private readonly kafka: ActorRef) { super(); } async onReceive(rec: KafkaRecord) { try { await db.insertOrder(JSON.parse(rec.value!.toString())); this.kafka.tell({ kind: ‘commit’, topic: rec.topic, partition: rec.partition, offset: rec.offset }); } catch (e) { this.kafka.tell({ kind: ‘nack’, topic: rec.topic, partition: rec.partition, offset: rec.offset }); } } }

new KafkaActor(settings?): KafkaActor

Defined in: src/io/broker/KafkaActor.ts:178

Partial<KafkaActorSettings> = {}

KafkaActor

BrokerActor<KafkaActorSettings, KafkaCmd, KafkaPublish>.constructor

onReceive(cmd): void

Defined in: src/io/broker/KafkaActor.ts:339

Main message handler. Receives each envelope dequeued from the mailbox. A thrown error (sync or async) is caught by the supervisor.

KafkaCmd

void

BrokerActor.onReceive


postRestart(_reason): void | Promise<void>

Defined in: src/Actor.ts:55

Called on the fresh instance after a restart. Default: call preStart().

Error

void | Promise<void>

BrokerActor.postRestart


postStop(): Promise<void>

Defined in: src/io/broker/BrokerActor.ts:241

Called after the actor has been terminated. Children are already stopped.

Promise<void>

BrokerActor.postStop


preRestart(_reason, _message?): void | Promise<void>

Defined in: src/Actor.ts:50

Called before a restart, on the instance about to be thrown away. The default stops children and then calls postStop().

Error

KafkaCmd

void | Promise<void>

BrokerActor.preRestart


preStart(): Promise<void>

Defined in: src/io/broker/BrokerActor.ts:235

Called after construction and before the first message is processed.

Promise<void>

BrokerActor.preStart


supervisorStrategy(): SupervisorStrategy

Defined in: src/Actor.ts:63

Supervisor strategy for this actor’s children. Defaults to restart, up to 10 times per minute, then stop.

SupervisorStrategy

BrokerActor.supervisorStrategy